enhance: move the lifetime implementation out of server level lifetime (#38442)

issue: #38399

- move the lifetime implementation of common code out of the server
level lifetime implementation

Signed-off-by: chyezh <chyezh@outlook.com>
pull/38510/head
Zhen Ye 2024-12-17 11:42:44 +08:00 committed by GitHub
parent 28fdbc4e30
commit afac153c26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 225 additions and 133 deletions

View File

@ -15,8 +15,8 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var errGracefulShutdown = errors.New("graceful shutdown")
@ -35,7 +35,7 @@ func NewResumableProducer(f factory, opts *ProducerOptions) *ResumableProducer {
cancel: cancel,
stopResumingCh: make(chan struct{}),
resumingExitCh: make(chan struct{}),
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
logger: log.With(zap.String("pchannel", opts.PChannel)),
opts: opts,
@ -63,7 +63,7 @@ type ResumableProducer struct {
// A: cancel the ctx will cancel the underlying running producer.
// Use producer Close is better way to stop producer.
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
logger *log.MLogger
opts *ProducerOptions
@ -78,7 +78,7 @@ type ResumableProducer struct {
// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
if p.lifetime.Add(lifetime.IsWorking) != nil {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
metricGuard := p.metrics.StartProduce(msg.EstimateSize())
@ -185,7 +185,7 @@ func (p *ResumableProducer) createNewProducer() (producer.Producer, error) {
// gracefulClose graceful close the producer.
func (p *ResumableProducer) gracefulClose() error {
p.lifetime.SetState(lifetime.Stopped)
p.lifetime.SetState(typeutil.LifetimeStateStopped)
p.lifetime.Wait()
// close the stop resuming background to avoid create new producer.
close(p.stopResumingCh)

View File

@ -16,9 +16,11 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var ErrWALAccesserClosed = status.NewOnShutdownError("wal accesser closed")
// newWALAccesser creates a new wal accesser.
func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streaming coord client.
@ -26,7 +28,7 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streamingnode handler client.
handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment())
return &walAccesserImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
streamingCoordAssignmentClient: streamingCoordClient,
handlerClient: handlerClient,
producerMutex: sync.Mutex{},
@ -40,7 +42,7 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// walAccesserImpl is the implementation of WALAccesser.
type walAccesserImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
// All services
streamingCoordAssignmentClient client.Client
@ -55,8 +57,8 @@ type walAccesserImpl struct {
// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertValidMessage(msg)
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error())
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}
defer w.lifetime.Done()
@ -66,10 +68,11 @@ func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMess
// Read returns a scanner for reading records from the wal.
func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
newErrScanner(status.NewOnShutdownError("wal accesser closed, %s", err.Error()))
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
newErrScanner(ErrWALAccesserClosed)
}
defer w.lifetime.Done()
if opts.VChannel == "" {
return newErrScanner(status.NewInvaildArgument("vchannel is required"))
}
@ -87,8 +90,8 @@ func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
}
func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error) {
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error())
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}
if opts.VChannel == "" {
@ -131,7 +134,7 @@ func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error)
// Close closes all the wal accesser.
func (w *walAccesserImpl) Close() {
w.lifetime.SetState(lifetime.Stopped)
w.lifetime.SetState(typeutil.LifetimeStateStopped)
w.lifetime.Wait()
w.producerMutex.Lock()

View File

@ -18,7 +18,7 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
@ -34,7 +34,7 @@ func TestWAL(t *testing.T) {
handler.EXPECT().Close().Return()
w := &walAccesserImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
streamingCoordAssignmentClient: coordClient,
handlerClient: handler,
producerMutex: sync.Mutex{},

View File

@ -13,8 +13,8 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// NewAssignmentService creates a new assignment service.
@ -23,7 +23,7 @@ func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAss
s := &AssignmentServiceImpl{
ctx: ctx,
cancel: cancel,
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
watcher: newWatcher(),
service: service,
resumingExitCh: make(chan struct{}),
@ -38,7 +38,7 @@ func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAss
type AssignmentServiceImpl struct {
ctx context.Context
cancel context.CancelFunc
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
watcher *watcher
service lazygrpc.Service[streamingpb.StreamingCoordAssignmentServiceClient]
resumingExitCh chan struct{}
@ -49,7 +49,7 @@ type AssignmentServiceImpl struct {
// AssignmentDiscover watches the assignment discovery.
func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func(*types.VersionedStreamingNodeAssignments) error) error {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("assignment service client is closing")
}
defer c.lifetime.Done()
@ -59,7 +59,7 @@ func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func(
// ReportAssignmentError reports the assignment error to server.
func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchannel types.PChannelInfo, assignmentErr error) error {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("assignment service client is closing")
}
defer c.lifetime.Done()
@ -75,7 +75,7 @@ func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchan
// Close closes the assignment service.
func (c *AssignmentServiceImpl) Close() {
c.lifetime.SetState(lifetime.Stopped)
c.lifetime.SetState(typeutil.LifetimeStateStopped)
c.lifetime.Wait()
c.cancel()

View File

@ -8,14 +8,13 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// newAssignmentDiscoverClient creates a new assignment discover client.
func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient {
c := &assignmentDiscoverClient{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
w: w,
streamClient: streamClient,
logger: log.With(),
@ -29,7 +28,7 @@ func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingC
// assignmentDiscoverClient is the client for assignment discover.
type assignmentDiscoverClient struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
w *watcher
logger *log.MLogger
requestCh chan *streamingpb.AssignmentDiscoverRequest
@ -40,7 +39,7 @@ type assignmentDiscoverClient struct {
// ReportAssignmentError reports the assignment error to server.
func (c *assignmentDiscoverClient) ReportAssignmentError(pchannel types.PChannelInfo, err error) {
if err := c.lifetime.Add(lifetime.IsWorking); err != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return
}
defer c.lifetime.Done()
@ -75,9 +74,8 @@ func (c *assignmentDiscoverClient) Available() <-chan struct{} {
// Close closes the assignment discover client.
func (c *assignmentDiscoverClient) Close() {
c.lifetime.SetState(lifetime.Stopped)
c.lifetime.SetState(typeutil.LifetimeStateStopped)
c.lifetime.Wait()
c.lifetime.Close()
close(c.requestCh)
c.wg.Wait()

View File

@ -13,7 +13,6 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -32,7 +31,7 @@ func RecoverBalancer(
return nil, errors.Wrap(err, "fail to recover channel manager")
}
b := &balancerImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
logger: log.With(zap.String("policy", policy)),
channelMetaManager: manager,
policy: mustGetPolicy(policy),
@ -45,7 +44,7 @@ func RecoverBalancer(
// balancerImpl is a implementation of Balancer.
type balancerImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
logger *log.MLogger
channelMetaManager *channel.ChannelManager
policy Policy // policy is the balance policy, TODO: should be dynamic in future.
@ -55,7 +54,7 @@ type balancerImpl struct {
// WatchChannelAssignments watches the balance result.
func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error) error {
if b.lifetime.Add(lifetime.IsWorking) != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("balancer is closing")
}
defer b.lifetime.Done()
@ -63,7 +62,7 @@ func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(vers
}
func (b *balancerImpl) MarkAsUnavailable(ctx context.Context, pChannels []types.PChannelInfo) error {
if b.lifetime.Add(lifetime.IsWorking) != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("balancer is closing")
}
defer b.lifetime.Done()
@ -73,7 +72,7 @@ func (b *balancerImpl) MarkAsUnavailable(ctx context.Context, pChannels []types.
// Trigger trigger a re-balance.
func (b *balancerImpl) Trigger(ctx context.Context) error {
if b.lifetime.Add(lifetime.IsWorking) != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("balancer is closing")
}
defer b.lifetime.Done()
@ -93,7 +92,7 @@ func (b *balancerImpl) sendRequestAndWaitFinish(ctx context.Context, newReq *req
// Close close the balancer.
func (b *balancerImpl) Close() {
b.lifetime.SetState(lifetime.Stopped)
b.lifetime.SetState(typeutil.LifetimeStateStopped)
b.lifetime.Wait()
b.backgroundTaskNotifier.Cancel()

View File

@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -100,7 +99,7 @@ func NewHandlerClient(w types.AssignmentDiscoverWatcher) HandlerClient {
})
watcher := assignment.NewWatcher(rb.Resolver())
return &handlerClientImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
service: lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingNodeHandlerServiceClient),
rb: rb,
watcher: watcher,

View File

@ -18,13 +18,13 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var errWaitNextBackoff = errors.New("wait for next backoff")
type handlerClientImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
service lazygrpc.Service[streamingpb.StreamingNodeHandlerServiceClient]
rb resolver.Builder
watcher assignment.Watcher
@ -35,7 +35,7 @@ type handlerClientImpl struct {
// CreateProducer creates a producer.
func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerOptions) (Producer, error) {
if hc.lifetime.Add(lifetime.IsWorking) != nil {
if !hc.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrClientClosed
}
defer hc.lifetime.Done()
@ -58,7 +58,7 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO
// CreateConsumer creates a consumer.
func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerOptions) (Consumer, error) {
if hc.lifetime.Add(lifetime.IsWorking) != nil {
if !hc.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrClientClosed
}
defer hc.lifetime.Done()
@ -135,9 +135,8 @@ func (hc *handlerClientImpl) waitForNextBackoff(ctx context.Context, pchannel st
// Close closes the handler client.
func (hc *handlerClientImpl) Close() {
hc.lifetime.SetState(lifetime.Stopped)
hc.lifetime.SetState(typeutil.LifetimeStateStopped)
hc.lifetime.Wait()
hc.lifetime.Close()
hc.watcher.Close()
hc.service.Close()

View File

@ -22,8 +22,8 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestHandlerClient(t *testing.T) {
@ -50,7 +50,7 @@ func TestHandlerClient(t *testing.T) {
pK := 0
handler := &handlerClientImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
service: service,
rb: rb,
watcher: w,

View File

@ -15,7 +15,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -62,7 +61,7 @@ func CreateProducer(
zap.String("pchannel", opts.Assignment.Channel.Name),
zap.Int64("term", opts.Assignment.Channel.Term),
zap.Int64("streamingNodeID", opts.Assignment.Node.ServerID)),
lifetime: lifetime.NewLifetime[lifetime.State](lifetime.Working),
lifetime: typeutil.NewLifetime(),
idAllocator: typeutil.NewIDAllocator(),
grpcStreamClient: produceClient,
pendingRequests: sync.Map{},
@ -97,7 +96,7 @@ type producerImpl struct {
assignment types.PChannelInfoAssigned
walName string
logger *log.MLogger
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
idAllocator *typeutil.IDAllocator
grpcStreamClient *produceGrpcClient
@ -126,7 +125,7 @@ func (p *producerImpl) Assignment() types.PChannelInfoAssigned {
// Produce sends the produce message to server.
func (p *producerImpl) Produce(ctx context.Context, msg message.MutableMessage) (*ProduceResult, error) {
if p.lifetime.Add(lifetime.IsWorking) != nil {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("producer client is shutting down")
}
defer p.lifetime.Done()
@ -196,7 +195,7 @@ func (p *producerImpl) Available() <-chan struct{} {
// Close close the producer client.
func (p *producerImpl) Close() {
// Wait for all message has been sent.
p.lifetime.SetState(lifetime.Stopped)
p.lifetime.SetState(typeutil.LifetimeStateStopped)
p.lifetime.Wait()
close(p.requestCh)

View File

@ -19,7 +19,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -61,7 +60,8 @@ func NewManagerClient(etcdCli *clientv3.Client) ManagerClient {
)
})
return &managerClientImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
stopped: make(chan struct{}),
rb: rb,
service: lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingNodeManagerServiceClient),
}

View File

@ -16,21 +16,22 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ ManagerClient = (*managerClientImpl)(nil)
// managerClientImpl implements ManagerClient.
type managerClientImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
stopped chan struct{}
rb resolver.Builder
service lazygrpc.Service[streamingpb.StreamingNodeManagerServiceClient]
}
func (c *managerClientImpl) WatchNodeChanged(ctx context.Context) (<-chan struct{}, error) {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("manager client is closing")
}
defer c.lifetime.Done()
@ -42,7 +43,7 @@ func (c *managerClientImpl) WatchNodeChanged(ctx context.Context) (<-chan struct
select {
case <-ctx.Done():
return ctx.Err()
case <-c.lifetime.CloseCh():
case <-c.stopped:
return status.NewOnShutdownError("manager client is closing")
case resultCh <- struct{}{}:
}
@ -54,7 +55,7 @@ func (c *managerClientImpl) WatchNodeChanged(ctx context.Context) (<-chan struct
// CollectAllStatus collects status in all underlying streamingnode.
func (c *managerClientImpl) CollectAllStatus(ctx context.Context) (map[int64]*types.StreamingNodeStatus, error) {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("manager client is closing")
}
defer c.lifetime.Done()
@ -129,7 +130,7 @@ func (c *managerClientImpl) getAllStreamingNodeStatus(ctx context.Context, state
// Assign a wal instance for the channel on log node of given server id.
func (c *managerClientImpl) Assign(ctx context.Context, pchannel types.PChannelInfoAssigned) error {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("manager client is closing")
}
defer c.lifetime.Done()
@ -150,7 +151,7 @@ func (c *managerClientImpl) Assign(ctx context.Context, pchannel types.PChannelI
// Remove the wal instance for the channel on log node of given server id.
func (c *managerClientImpl) Remove(ctx context.Context, pchannel types.PChannelInfoAssigned) error {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("manager client is closing")
}
defer c.lifetime.Done()
@ -182,9 +183,9 @@ func (c *managerClientImpl) Remove(ctx context.Context, pchannel types.PChannelI
// Close closes the manager client.
func (c *managerClientImpl) Close() {
c.lifetime.SetState(lifetime.Stopped)
c.lifetime.SetState(typeutil.LifetimeStateStopped)
close(c.stopped)
c.lifetime.Wait()
c.lifetime.Close()
c.service.Close()
c.rb.Close()

View File

@ -21,7 +21,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -30,7 +29,8 @@ func TestManager(t *testing.T) {
rb := mock_resolver.NewMockBuilder(t)
managerService := mock_lazygrpc.NewMockService[streamingpb.StreamingNodeManagerServiceClient](t)
m := &managerClientImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
stopped: make(chan struct{}),
rb: rb,
service: managerService,
}

View File

@ -10,7 +10,6 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -19,7 +18,7 @@ var _ wal.Opener = (*openerAdaptorImpl)(nil)
// adaptImplsToOpener creates a new wal opener with opener impls.
func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.InterceptorBuilder) wal.Opener {
return &openerAdaptorImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
opener: opener,
idAllocator: typeutil.NewIDAllocator(),
walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](),
@ -29,7 +28,7 @@ func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.Int
// openerAdaptorImpl is the wrapper of OpenerImpls to Opener.
type openerAdaptorImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
opener walimpls.OpenerImpls
idAllocator *typeutil.IDAllocator
walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator.
@ -38,7 +37,7 @@ type openerAdaptorImpl struct {
// Open opens a wal instance for the channel.
func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) {
if o.lifetime.Add(lifetime.IsWorking) != nil {
if !o.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("wal opener is on shutdown")
}
defer o.lifetime.Done()
@ -67,9 +66,8 @@ func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.
// Close the wal opener, release the underlying resources.
func (o *openerAdaptorImpl) Close() {
o.lifetime.SetState(lifetime.Stopped)
o.lifetime.SetState(typeutil.LifetimeStateStopped)
o.lifetime.Wait()
o.lifetime.Close()
// close all wal instances.
o.walInstances.Range(func(id int64, l wal.WAL) bool {

View File

@ -16,7 +16,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -36,7 +35,8 @@ func adaptImplsToWAL(
WAL: syncutil.NewFuture[wal.WAL](),
}
wal := &walAdaptorImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
available: make(chan struct{}),
idAllocator: typeutil.NewIDAllocator(),
inner: basicWAL,
// TODO: make the pool size configurable.
@ -57,7 +57,8 @@ func adaptImplsToWAL(
// walAdaptorImpl is a wrapper of WALImpls to extend it into a WAL interface.
type walAdaptorImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
available chan struct{}
idAllocator *typeutil.IDAllocator
inner walimpls.WALImpls
appendExecutionPool *conc.Pool[struct{}]
@ -80,7 +81,7 @@ func (w *walAdaptorImpl) Channel() types.PChannelInfo {
// Append writes a record to the log.
func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (*wal.AppendResult, error) {
if w.lifetime.Add(lifetime.IsWorking) != nil {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("wal is on shutdown")
}
defer w.lifetime.Done()
@ -137,7 +138,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
// AppendAsync writes a record to the log asynchronously.
func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*wal.AppendResult, error)) {
if w.lifetime.Add(lifetime.IsWorking) != nil {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
cb(nil, status.NewOnShutdownError("wal is on shutdown"))
return
}
@ -154,7 +155,7 @@ func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMes
// Read returns a scanner for reading records from the wal.
func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Scanner, error) {
if w.lifetime.Add(lifetime.IsWorking) != nil {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("wal is on shutdown")
}
defer w.lifetime.Done()
@ -177,12 +178,17 @@ func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Sca
// IsAvailable returns whether the wal is available.
func (w *walAdaptorImpl) IsAvailable() bool {
return !w.lifetime.IsClosed()
select {
case <-w.available:
return false
default:
return true
}
}
// Available returns a channel that will be closed when the wal is shut down.
func (w *walAdaptorImpl) Available() <-chan struct{} {
return w.lifetime.CloseCh()
return w.available
}
// Close overrides Scanner Close function.
@ -195,9 +201,9 @@ func (w *walAdaptorImpl) Close() {
logger.Info("wal graceful close done, wait for operation to be finished...")
// begin to close the wal.
w.lifetime.SetState(lifetime.Stopped)
w.lifetime.SetState(typeutil.LifetimeStateStopped)
w.lifetime.Wait()
w.lifetime.Close()
close(w.available)
logger.Info("wal begin to close scanners...")

View File

@ -15,9 +15,9 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// RecoverPChannelSegmentAllocManager recovers the segment assignment manager at the specified pchannel.
@ -45,7 +45,7 @@ func RecoverPChannelSegmentAllocManager(
logger := log.With(zap.Any("pchannel", pchannel))
return &PChannelSegmentAllocManager{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
logger: logger,
pchannel: pchannel,
managers: managers,
@ -56,7 +56,7 @@ func RecoverPChannelSegmentAllocManager(
// PChannelSegmentAllocManager is a segment assign manager of determined pchannel.
type PChannelSegmentAllocManager struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
logger *log.MLogger
pchannel types.PChannelInfo
@ -175,7 +175,7 @@ func (m *PChannelSegmentAllocManager) SealAndFenceSegmentUntil(ctx context.Conte
// TryToSealSegments tries to seal the specified segments.
func (m *PChannelSegmentAllocManager) TryToSealSegments(ctx context.Context, infos ...stats.SegmentBelongs) {
if err := m.lifetime.Add(lifetime.IsWorking); err != nil {
if !m.lifetime.Add(typeutil.LifetimeStateWorking) {
return
}
defer m.lifetime.Done()
@ -197,7 +197,7 @@ func (m *PChannelSegmentAllocManager) TryToSealSegments(ctx context.Context, inf
}
func (m *PChannelSegmentAllocManager) MustSealSegments(ctx context.Context, infos ...stats.SegmentBelongs) {
if err := m.lifetime.Add(lifetime.IsWorking); err != nil {
if !m.lifetime.Add(typeutil.LifetimeStateWorking) {
return
}
defer m.lifetime.Done()
@ -221,7 +221,7 @@ func (m *PChannelSegmentAllocManager) MustSealSegments(ctx context.Context, info
// TryToSealWaitedSegment tries to seal the wait for sealing segment.
func (m *PChannelSegmentAllocManager) TryToSealWaitedSegment(ctx context.Context) {
if err := m.lifetime.Add(lifetime.IsWorking); err != nil {
if !m.lifetime.Add(typeutil.LifetimeStateWorking) {
return
}
defer m.lifetime.Done()
@ -236,7 +236,7 @@ func (m *PChannelSegmentAllocManager) IsNoWaitSeal() bool {
// WaitUntilNoWaitSeal waits until no segment wait for seal.
func (m *PChannelSegmentAllocManager) WaitUntilNoWaitSeal(ctx context.Context) error {
if err := m.lifetime.Add(lifetime.IsWorking); err != nil {
if err := m.checkLifetime(); err != nil {
return err
}
defer m.lifetime.Done()
@ -246,8 +246,8 @@ func (m *PChannelSegmentAllocManager) WaitUntilNoWaitSeal(ctx context.Context) e
// checkLifetime checks the lifetime of the segment manager.
func (m *PChannelSegmentAllocManager) checkLifetime() error {
if err := m.lifetime.Add(lifetime.IsWorking); err != nil {
m.logger.Warn("unreachable: segment assignment manager is not working, so the wal is on closing", zap.Error(err))
if !m.lifetime.Add(typeutil.LifetimeStateWorking) {
m.logger.Warn("unreachable: segment assignment manager is not working, so the wal is on closing")
return errors.New("segment assignment manager is not working")
}
return nil
@ -256,7 +256,7 @@ func (m *PChannelSegmentAllocManager) checkLifetime() error {
// Close try to persist all stats and invalid the manager.
func (m *PChannelSegmentAllocManager) Close(ctx context.Context) {
m.logger.Info("segment assignment manager start to close")
m.lifetime.SetState(lifetime.Stopped)
m.lifetime.SetState(typeutil.LifetimeStateStopped)
m.lifetime.Wait()
// Try to seal all wait

View File

@ -3,7 +3,6 @@ package walmanager
import (
"context"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
@ -12,10 +11,11 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var errWALManagerClosed = status.NewOnShutdownError("wal manager is closed")
// OpenManager create a wal manager.
func OpenManager() (Manager, error) {
walName := util.MustSelectWALName()
@ -30,7 +30,7 @@ func OpenManager() (Manager, error) {
// newManager create a wal manager.
func newManager(opener wal.Opener) Manager {
return &managerImpl{
lifetime: lifetime.NewLifetime(managerOpenable | managerRemoveable | managerGetable),
lifetime: typeutil.NewGenericLifetime[managerState](managerOpenable | managerRemoveable | managerGetable),
wltMap: typeutil.NewConcurrentMap[string, *walLifetime](),
opener: opener,
}
@ -38,7 +38,7 @@ func newManager(opener wal.Opener) Manager {
// All management operation for a wal will be serialized with order of term.
type managerImpl struct {
lifetime lifetime.Lifetime[managerState]
lifetime *typeutil.GenericLifetime[managerState]
wltMap *typeutil.ConcurrentMap[string, *walLifetime]
opener wal.Opener // wal allocator
@ -47,8 +47,8 @@ type managerImpl struct {
// Open opens a wal instance for the channel on this Manager.
func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if err := m.lifetime.Add(isOpenable); err != nil {
return status.NewOnShutdownError("wal manager is closed, %s", err.Error())
if !m.lifetime.AddIf(isOpenable) {
return errWALManagerClosed
}
defer func() {
m.lifetime.Done()
@ -65,8 +65,8 @@ func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err
// Remove removes the wal instance for the channel.
func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if err := m.lifetime.Add(isRemoveable); err != nil {
return status.NewOnShutdownError("wal manager is closed, %s", err.Error())
if !m.lifetime.AddIf(isRemoveable) {
return errWALManagerClosed
}
defer func() {
m.lifetime.Done()
@ -84,8 +84,8 @@ func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (e
// Return nil if the wal instance is not found.
func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
// reject operation if manager is closing.
if err := m.lifetime.Add(isGetable); err != nil {
return nil, status.NewOnShutdownError("wal manager is closed, %s", err)
if !m.lifetime.AddIf(isGetable) {
return nil, errWALManagerClosed
}
defer m.lifetime.Done()
@ -104,8 +104,8 @@ func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, erro
// GetAllAvailableChannels returns all available channel info.
func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) {
// reject operation if manager is closing.
if err := m.lifetime.Add(isGetable); err != nil {
return nil, status.NewOnShutdownError("wal manager is closed, %s", err)
if !m.lifetime.AddIf(isGetable) {
return nil, errWALManagerClosed
}
defer m.lifetime.Done()
@ -132,7 +132,6 @@ func (m *managerImpl) Close() {
})
m.lifetime.SetState(managerStopped)
m.lifetime.Wait()
m.lifetime.Close()
// close all underlying wal instance by allocator if there's resource leak.
m.opener.Close()
@ -163,23 +162,14 @@ const (
managerGetable managerState = 0x1 << 2
)
func isGetable(state managerState) error {
if state&managerGetable != 0 {
return nil
}
return errors.New("wal manager can not do get operation")
func isGetable(state managerState) bool {
return state&managerGetable != 0
}
func isRemoveable(state managerState) error {
if state&managerRemoveable != 0 {
return nil
}
return errors.New("wal manager can not do remove operation")
func isRemoveable(state managerState) bool {
return state&managerRemoveable != 0
}
func isOpenable(state managerState) error {
if state&managerOpenable != 0 {
return nil
}
return errors.New("wal manager can not do open operation")
func isOpenable(state managerState) bool {
return state&managerOpenable != 0
}

View File

@ -10,7 +10,6 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -38,7 +37,7 @@ func NewSessionBuilder(c *clientv3.Client, role string) Builder {
func newBuilder(scheme string, d discoverer.Discoverer) Builder {
resolver := newResolverWithDiscoverer(scheme, d, 1*time.Second) // configurable.
return &builderImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
scheme: scheme,
resolver: resolver,
}
@ -46,7 +45,7 @@ func newBuilder(scheme string, d discoverer.Discoverer) Builder {
// builderImpl implements resolver.Builder.
type builderImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
scheme string
resolver *resolverWithDiscoverer
}
@ -60,7 +59,7 @@ type builderImpl struct {
// Resolver is built when a Builder constructed.
// So build operation just register a new watcher into the existed resolver to share the resolver result.
func (b *builderImpl) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
if err := b.lifetime.Add(lifetime.IsWorking); err != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.New("builder is closed")
}
defer b.lifetime.Done()
@ -84,8 +83,7 @@ func (b *builderImpl) Scheme() string {
// Close closes the builder also close the underlying resolver.
func (b *builderImpl) Close() {
b.lifetime.SetState(lifetime.Stopped)
b.lifetime.SetState(typeutil.LifetimeStateStopped)
b.lifetime.Wait()
b.lifetime.Close()
b.resolver.Close()
}

View File

@ -10,7 +10,6 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -115,7 +114,7 @@ func (r *resolverWithDiscoverer) doDiscover() {
defer func() {
// Check if all grpc resolver is stopped.
for r := range grpcResolvers {
if err := lifetime.IsWorking(r.State()); err == nil {
if r.State() == typeutil.LifetimeStateWorking {
r.logger.Warn("resolver is stopped before grpc watcher exist, maybe bug here")
break
}

View File

@ -6,7 +6,7 @@ import (
"google.golang.org/grpc/resolver"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ resolver.Resolver = (*watchBasedGRPCResolver)(nil)
@ -14,7 +14,7 @@ var _ resolver.Resolver = (*watchBasedGRPCResolver)(nil)
// newWatchBasedGRPCResolver creates a new watch based grpc resolver.
func newWatchBasedGRPCResolver(cc resolver.ClientConn, logger *log.MLogger) *watchBasedGRPCResolver {
return &watchBasedGRPCResolver{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
cc: cc,
logger: logger,
}
@ -22,7 +22,7 @@ func newWatchBasedGRPCResolver(cc resolver.ClientConn, logger *log.MLogger) *wat
// watchBasedGRPCResolver is a watch based grpc resolver.
type watchBasedGRPCResolver struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
cc resolver.ClientConn
logger *log.MLogger
@ -38,15 +38,14 @@ func (r *watchBasedGRPCResolver) ResolveNow(_ resolver.ResolveNowOptions) {
// Close closes the resolver.
// Do nothing.
func (r *watchBasedGRPCResolver) Close() {
r.lifetime.SetState(lifetime.Stopped)
r.lifetime.SetState(typeutil.LifetimeStateStopped)
r.lifetime.Wait()
r.lifetime.Close()
}
// Update updates the state of the resolver.
// Return error if the resolver is closed.
func (r *watchBasedGRPCResolver) Update(state VersionedState) error {
if r.lifetime.Add(lifetime.IsWorking) != nil {
if !r.lifetime.Add(typeutil.LifetimeStateWorking) {
return errors.New("resolver is closed")
}
defer r.lifetime.Done()
@ -61,6 +60,6 @@ func (r *watchBasedGRPCResolver) Update(state VersionedState) error {
}
// State returns the state of the resolver.
func (r *watchBasedGRPCResolver) State() lifetime.State {
func (r *watchBasedGRPCResolver) State() typeutil.LifetimeState {
return r.lifetime.GetState()
}

View File

@ -0,0 +1,68 @@
package typeutil
import "sync"
type LifetimeState int
var (
LifetimeStateWorking LifetimeState = 0
LifetimeStateStopped LifetimeState = 1
)
// NewLifetime returns a new instance of Lifetime with default state logic.
func NewLifetime() *Lifetime {
return NewGenericLifetime(LifetimeStateWorking)
}
// NewGenericLifetime returns a new instance of Lifetime with init state and isHealthy logic.
// WARNING: This type is a unsafe type, the lifetime state transfer should never be a loop.
// The state is controlled by the user, and the user should ensure the state transfer is correct.
func NewGenericLifetime[State comparable](initState State) *GenericLifetime[State] {
return &GenericLifetime[State]{
mu: sync.Mutex{},
wg: sync.WaitGroup{},
state: initState,
}
}
type Lifetime = GenericLifetime[LifetimeState]
// GenericLifetime is a common component lifetime control logic.
type GenericLifetime[State comparable] struct {
mu sync.Mutex
wg sync.WaitGroup
state State
}
func (l *GenericLifetime[State]) GetState() State {
return l.state
}
func (l *GenericLifetime[State]) SetState(s State) {
l.mu.Lock()
defer l.mu.Unlock()
l.state = s
}
func (l *GenericLifetime[State]) Add(s State) bool {
return l.AddIf(func(s2 State) bool { return s == s2 })
}
func (l *GenericLifetime[State]) AddIf(pred func(s State) bool) bool {
l.mu.Lock()
defer l.mu.Unlock()
if !pred(l.state) {
return false
}
l.wg.Add(1)
return true
}
func (l *GenericLifetime[State]) Done() {
l.wg.Done()
}
func (l *GenericLifetime[State]) Wait() {
l.wg.Wait()
}

View File

@ -0,0 +1,36 @@
package typeutil
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestLifetime(t *testing.T) {
l := NewLifetime()
assert.True(t, l.Add(LifetimeStateWorking))
assert.False(t, l.Add(LifetimeStateStopped))
assert.Equal(t, l.GetState(), LifetimeStateWorking)
done := make(chan struct{})
go func() {
l.Wait()
close(done)
}()
select {
case <-time.After(10 * time.Millisecond):
case <-done:
assert.Fail(t, "lifetime should not be stopped")
}
l.SetState(LifetimeStateStopped)
assert.Equal(t, l.GetState(), LifetimeStateStopped)
assert.False(t, l.Add(LifetimeStateWorking))
select {
case <-time.After(10 * time.Millisecond):
case <-done:
assert.Fail(t, "lifetime should not be stopped")
}
l.Done()
<-done
l.Wait()
}