mirror of https://github.com/milvus-io/milvus.git
				
				
				
			fix: add future stateful lock (#36332)
issue: #36323 Signed-off-by: chyezh <chyezh@outlook.com>pull/36266/head
							parent
							
								
									a03397ba70
								
							
						
					
					
						commit
						47da9023a6
					
				| 
						 | 
				
			
			@ -91,13 +91,12 @@ func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future {
 | 
			
		|||
 | 
			
		||||
	ctx, cancel := context.WithCancel(ctx)
 | 
			
		||||
	future := &futureImpl{
 | 
			
		||||
		closure:      f,
 | 
			
		||||
		ctx:          ctx,
 | 
			
		||||
		ctxCancel:    cancel,
 | 
			
		||||
		releaserOnce: sync.Once{},
 | 
			
		||||
		future:       cFuturePtr,
 | 
			
		||||
		opts:         options,
 | 
			
		||||
		state:        newFutureState(),
 | 
			
		||||
		closure:   f,
 | 
			
		||||
		ctx:       ctx,
 | 
			
		||||
		ctxCancel: cancel,
 | 
			
		||||
		future:    cFuturePtr,
 | 
			
		||||
		opts:      options,
 | 
			
		||||
		state:     newFutureState(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// register the future to do timeout notification.
 | 
			
		||||
| 
						 | 
				
			
			@ -106,29 +105,33 @@ func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
type futureImpl struct {
 | 
			
		||||
	ctx          context.Context
 | 
			
		||||
	ctxCancel    context.CancelFunc
 | 
			
		||||
	future       *C.CFuture
 | 
			
		||||
	closure      CGOAsyncFunction
 | 
			
		||||
	opts         *options
 | 
			
		||||
	state        futureState
 | 
			
		||||
	releaserOnce sync.Once
 | 
			
		||||
	ctx       context.Context
 | 
			
		||||
	ctxCancel context.CancelFunc
 | 
			
		||||
	future    *C.CFuture
 | 
			
		||||
	closure   CGOAsyncFunction
 | 
			
		||||
	opts      *options
 | 
			
		||||
	state     futureState
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Context return the context of the future.
 | 
			
		||||
func (f *futureImpl) Context() context.Context {
 | 
			
		||||
	return f.ctx
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BlockUntilReady block until the future is ready or canceled.
 | 
			
		||||
func (f *futureImpl) BlockUntilReady() {
 | 
			
		||||
	f.blockUntilReady()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result.
 | 
			
		||||
func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) {
 | 
			
		||||
	f.blockUntilReady()
 | 
			
		||||
 | 
			
		||||
	if !f.state.intoConsumed() {
 | 
			
		||||
	guard := f.state.LockForConsume()
 | 
			
		||||
	if guard == nil {
 | 
			
		||||
		return nil, ErrConsumed
 | 
			
		||||
	}
 | 
			
		||||
	defer guard.Unlock()
 | 
			
		||||
 | 
			
		||||
	var ptr unsafe.Pointer
 | 
			
		||||
	var status C.CStatus
 | 
			
		||||
| 
						 | 
				
			
			@ -144,21 +147,31 @@ func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) {
 | 
			
		|||
	return ptr, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Release the resource of the future.
 | 
			
		||||
func (f *futureImpl) Release() {
 | 
			
		||||
	// block until ready to release the future.
 | 
			
		||||
	f.blockUntilReady()
 | 
			
		||||
 | 
			
		||||
	guard := f.state.LockForRelease()
 | 
			
		||||
	if guard == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	defer guard.Unlock()
 | 
			
		||||
 | 
			
		||||
	// release the future.
 | 
			
		||||
	getCGOCaller().call("future_destroy", func() {
 | 
			
		||||
		C.future_destroy(f.future)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// cancel the future with error.
 | 
			
		||||
func (f *futureImpl) cancel(err error) {
 | 
			
		||||
	if !f.state.checkUnready() {
 | 
			
		||||
		// only unready future can be canceled.
 | 
			
		||||
		// a ready future' cancel make no sense.
 | 
			
		||||
	// only unready future can be canceled.
 | 
			
		||||
	guard := f.state.LockForCancel()
 | 
			
		||||
	if guard == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	defer guard.Unlock()
 | 
			
		||||
 | 
			
		||||
	if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) {
 | 
			
		||||
		getCGOCaller().call("future_cancel", func() {
 | 
			
		||||
| 
						 | 
				
			
			@ -169,8 +182,9 @@ func (f *futureImpl) cancel(err error) {
 | 
			
		|||
	panic("unreachable: invalid cancel error type")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// blockUntilReady block until the future is ready or canceled.
 | 
			
		||||
func (f *futureImpl) blockUntilReady() {
 | 
			
		||||
	if !f.state.checkUnready() {
 | 
			
		||||
	if !f.state.CheckUnready() {
 | 
			
		||||
		// only unready future should be block until ready.
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -183,10 +197,7 @@ func (f *futureImpl) blockUntilReady() {
 | 
			
		|||
	mu.Lock()
 | 
			
		||||
 | 
			
		||||
	// mark the future as ready at go side to avoid more cgo calls.
 | 
			
		||||
	f.state.intoReady()
 | 
			
		||||
	f.state.IntoReady()
 | 
			
		||||
	// notify the future manager that the future is ready.
 | 
			
		||||
	f.ctxCancel()
 | 
			
		||||
	if f.opts.releaser != nil {
 | 
			
		||||
		f.releaserOnce.Do(f.opts.releaser)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -25,6 +25,32 @@ func TestMain(m *testing.M) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFutureWithConcurrentReleaseAndCancel(t *testing.T) {
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	for i := 0; i < 20; i++ {
 | 
			
		||||
		future := createFutureWithTestCase(context.Background(), testCase{
 | 
			
		||||
			interval: 100 * time.Millisecond,
 | 
			
		||||
			loopCnt:  10,
 | 
			
		||||
			caseNo:   100,
 | 
			
		||||
		})
 | 
			
		||||
		wg.Add(3)
 | 
			
		||||
		// Double release should be ok.
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			future.Release()
 | 
			
		||||
		}()
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			future.Release()
 | 
			
		||||
		}()
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			future.cancel(context.DeadlineExceeded)
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFutureWithSuccessCase(t *testing.T) {
 | 
			
		||||
	// Test success case.
 | 
			
		||||
	future := createFutureWithTestCase(context.Background(), testCase{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,27 +2,17 @@ package cgo
 | 
			
		|||
 | 
			
		||||
func getDefaultOpt() *options {
 | 
			
		||||
	return &options{
 | 
			
		||||
		name:     "unknown",
 | 
			
		||||
		releaser: nil,
 | 
			
		||||
		name: "unknown",
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type options struct {
 | 
			
		||||
	name     string
 | 
			
		||||
	releaser func()
 | 
			
		||||
	name string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Opt is the option type for future.
 | 
			
		||||
type Opt func(*options)
 | 
			
		||||
 | 
			
		||||
// WithReleaser sets the releaser function.
 | 
			
		||||
// When a future is ready, the releaser function will be called once.
 | 
			
		||||
func WithReleaser(releaser func()) Opt {
 | 
			
		||||
	return func(o *options) {
 | 
			
		||||
		o.releaser = releaser
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithName sets the name of the future.
 | 
			
		||||
// Only used for metrics.
 | 
			
		||||
func WithName(name string) Opt {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,38 +1,99 @@
 | 
			
		|||
package cgo
 | 
			
		||||
 | 
			
		||||
import "go.uber.org/atomic"
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	stateUnready int32 = iota
 | 
			
		||||
	stateUnready state = iota
 | 
			
		||||
	stateReady
 | 
			
		||||
	stateConsumed
 | 
			
		||||
	stateDestoryed
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// newFutureState creates a new futureState.
 | 
			
		||||
func newFutureState() futureState {
 | 
			
		||||
	return futureState{
 | 
			
		||||
		inner: atomic.NewInt32(stateUnready),
 | 
			
		||||
		mu:    sync.Mutex{},
 | 
			
		||||
		inner: stateUnready,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type state int32
 | 
			
		||||
 | 
			
		||||
// futureState is a state machine for future.
 | 
			
		||||
// unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed
 | 
			
		||||
type futureState struct {
 | 
			
		||||
	inner *atomic.Int32
 | 
			
		||||
	mu    sync.Mutex
 | 
			
		||||
	inner state
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// intoReady sets the state to ready.
 | 
			
		||||
func (s *futureState) intoReady() {
 | 
			
		||||
	s.inner.CompareAndSwap(stateUnready, stateReady)
 | 
			
		||||
// LockForCancel locks the state for cancel.
 | 
			
		||||
func (s *futureState) LockForCancel() *lockGuard {
 | 
			
		||||
	s.mu.Lock()
 | 
			
		||||
	// only unready future can be canceled.
 | 
			
		||||
	// cancel on a ready future make no sense.
 | 
			
		||||
	if s.inner != stateUnready {
 | 
			
		||||
		s.mu.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return &lockGuard{
 | 
			
		||||
		locker: s,
 | 
			
		||||
		target: stateUnready,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// intoConsumed sets the state to consumed.
 | 
			
		||||
// if the state is not ready, it does nothing and returns false.
 | 
			
		||||
func (s *futureState) intoConsumed() bool {
 | 
			
		||||
	return s.inner.CompareAndSwap(stateReady, stateConsumed)
 | 
			
		||||
// LockForConsume locks the state for consume.
 | 
			
		||||
func (s *futureState) LockForConsume() *lockGuard {
 | 
			
		||||
	s.mu.Lock()
 | 
			
		||||
	if s.inner != stateReady {
 | 
			
		||||
		s.mu.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return &lockGuard{
 | 
			
		||||
		locker: s,
 | 
			
		||||
		target: stateConsumed,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LockForRelease locks the state for release.
 | 
			
		||||
func (s *futureState) LockForRelease() *lockGuard {
 | 
			
		||||
	s.mu.Lock()
 | 
			
		||||
	if s.inner != stateReady && s.inner != stateConsumed {
 | 
			
		||||
		s.mu.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return &lockGuard{
 | 
			
		||||
		locker: s,
 | 
			
		||||
		target: stateDestoryed,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// checkUnready checks if the state is unready.
 | 
			
		||||
func (s *futureState) checkUnready() bool {
 | 
			
		||||
	return s.inner.Load() == stateUnready
 | 
			
		||||
func (s *futureState) CheckUnready() bool {
 | 
			
		||||
	s.mu.Lock()
 | 
			
		||||
	defer s.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	return s.inner == stateUnready
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IntoReady changes the state to ready.
 | 
			
		||||
func (s *futureState) IntoReady() {
 | 
			
		||||
	s.mu.Lock()
 | 
			
		||||
	if s.inner == stateUnready {
 | 
			
		||||
		s.inner = stateReady
 | 
			
		||||
	}
 | 
			
		||||
	s.mu.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// lockGuard is a guard for futureState.
 | 
			
		||||
type lockGuard struct {
 | 
			
		||||
	locker *futureState
 | 
			
		||||
	target state
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Unlock unlocks the state.
 | 
			
		||||
func (lg *lockGuard) Unlock() {
 | 
			
		||||
	lg.locker.inner = lg.target
 | 
			
		||||
	lg.locker.mu.Unlock()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue