diff --git a/internal/util/cgo/futures.go b/internal/util/cgo/futures.go index c0a3b9885e..8f115737dd 100644 --- a/internal/util/cgo/futures.go +++ b/internal/util/cgo/futures.go @@ -91,13 +91,12 @@ func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future { ctx, cancel := context.WithCancel(ctx) future := &futureImpl{ - closure: f, - ctx: ctx, - ctxCancel: cancel, - releaserOnce: sync.Once{}, - future: cFuturePtr, - opts: options, - state: newFutureState(), + closure: f, + ctx: ctx, + ctxCancel: cancel, + future: cFuturePtr, + opts: options, + state: newFutureState(), } // register the future to do timeout notification. @@ -106,29 +105,33 @@ func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future { } type futureImpl struct { - ctx context.Context - ctxCancel context.CancelFunc - future *C.CFuture - closure CGOAsyncFunction - opts *options - state futureState - releaserOnce sync.Once + ctx context.Context + ctxCancel context.CancelFunc + future *C.CFuture + closure CGOAsyncFunction + opts *options + state futureState } +// Context return the context of the future. func (f *futureImpl) Context() context.Context { return f.ctx } +// BlockUntilReady block until the future is ready or canceled. func (f *futureImpl) BlockUntilReady() { f.blockUntilReady() } +// BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result. func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) { f.blockUntilReady() - if !f.state.intoConsumed() { + guard := f.state.LockForConsume() + if guard == nil { return nil, ErrConsumed } + defer guard.Unlock() var ptr unsafe.Pointer var status C.CStatus @@ -144,21 +147,31 @@ func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) { return ptr, err } +// Release the resource of the future. func (f *futureImpl) Release() { // block until ready to release the future. f.blockUntilReady() + + guard := f.state.LockForRelease() + if guard == nil { + return + } + defer guard.Unlock() + // release the future. getCGOCaller().call("future_destroy", func() { C.future_destroy(f.future) }) } +// cancel the future with error. func (f *futureImpl) cancel(err error) { - if !f.state.checkUnready() { - // only unready future can be canceled. - // a ready future' cancel make no sense. + // only unready future can be canceled. + guard := f.state.LockForCancel() + if guard == nil { return } + defer guard.Unlock() if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) { getCGOCaller().call("future_cancel", func() { @@ -169,8 +182,9 @@ func (f *futureImpl) cancel(err error) { panic("unreachable: invalid cancel error type") } +// blockUntilReady block until the future is ready or canceled. func (f *futureImpl) blockUntilReady() { - if !f.state.checkUnready() { + if !f.state.CheckUnready() { // only unready future should be block until ready. return } @@ -183,10 +197,7 @@ func (f *futureImpl) blockUntilReady() { mu.Lock() // mark the future as ready at go side to avoid more cgo calls. - f.state.intoReady() + f.state.IntoReady() // notify the future manager that the future is ready. f.ctxCancel() - if f.opts.releaser != nil { - f.releaserOnce.Do(f.opts.releaser) - } } diff --git a/internal/util/cgo/futures_test.go b/internal/util/cgo/futures_test.go index 5f2a6360bc..ce6c6551b4 100644 --- a/internal/util/cgo/futures_test.go +++ b/internal/util/cgo/futures_test.go @@ -25,6 +25,32 @@ func TestMain(m *testing.M) { } } +func TestFutureWithConcurrentReleaseAndCancel(t *testing.T) { + wg := sync.WaitGroup{} + for i := 0; i < 20; i++ { + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: 100, + }) + wg.Add(3) + // Double release should be ok. + go func() { + defer wg.Done() + future.Release() + }() + go func() { + defer wg.Done() + future.Release() + }() + go func() { + defer wg.Done() + future.cancel(context.DeadlineExceeded) + }() + } + wg.Wait() +} + func TestFutureWithSuccessCase(t *testing.T) { // Test success case. future := createFutureWithTestCase(context.Background(), testCase{ diff --git a/internal/util/cgo/options.go b/internal/util/cgo/options.go index 96c25c4357..721e45f354 100644 --- a/internal/util/cgo/options.go +++ b/internal/util/cgo/options.go @@ -2,27 +2,17 @@ package cgo func getDefaultOpt() *options { return &options{ - name: "unknown", - releaser: nil, + name: "unknown", } } type options struct { - name string - releaser func() + name string } // Opt is the option type for future. type Opt func(*options) -// WithReleaser sets the releaser function. -// When a future is ready, the releaser function will be called once. -func WithReleaser(releaser func()) Opt { - return func(o *options) { - o.releaser = releaser - } -} - // WithName sets the name of the future. // Only used for metrics. func WithName(name string) Opt { diff --git a/internal/util/cgo/state.go b/internal/util/cgo/state.go index db262c4b60..86e9fd4579 100644 --- a/internal/util/cgo/state.go +++ b/internal/util/cgo/state.go @@ -1,38 +1,99 @@ package cgo -import "go.uber.org/atomic" +import ( + "sync" +) const ( - stateUnready int32 = iota + stateUnready state = iota stateReady stateConsumed + stateDestoryed ) // newFutureState creates a new futureState. func newFutureState() futureState { return futureState{ - inner: atomic.NewInt32(stateUnready), + mu: sync.Mutex{}, + inner: stateUnready, } } +type state int32 + // futureState is a state machine for future. // unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed type futureState struct { - inner *atomic.Int32 + mu sync.Mutex + inner state } -// intoReady sets the state to ready. -func (s *futureState) intoReady() { - s.inner.CompareAndSwap(stateUnready, stateReady) +// LockForCancel locks the state for cancel. +func (s *futureState) LockForCancel() *lockGuard { + s.mu.Lock() + // only unready future can be canceled. + // cancel on a ready future make no sense. + if s.inner != stateUnready { + s.mu.Unlock() + return nil + } + return &lockGuard{ + locker: s, + target: stateUnready, + } } -// intoConsumed sets the state to consumed. -// if the state is not ready, it does nothing and returns false. -func (s *futureState) intoConsumed() bool { - return s.inner.CompareAndSwap(stateReady, stateConsumed) +// LockForConsume locks the state for consume. +func (s *futureState) LockForConsume() *lockGuard { + s.mu.Lock() + if s.inner != stateReady { + s.mu.Unlock() + return nil + } + return &lockGuard{ + locker: s, + target: stateConsumed, + } +} + +// LockForRelease locks the state for release. +func (s *futureState) LockForRelease() *lockGuard { + s.mu.Lock() + if s.inner != stateReady && s.inner != stateConsumed { + s.mu.Unlock() + return nil + } + return &lockGuard{ + locker: s, + target: stateDestoryed, + } } // checkUnready checks if the state is unready. -func (s *futureState) checkUnready() bool { - return s.inner.Load() == stateUnready +func (s *futureState) CheckUnready() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.inner == stateUnready +} + +// IntoReady changes the state to ready. +func (s *futureState) IntoReady() { + s.mu.Lock() + if s.inner == stateUnready { + s.inner = stateReady + } + s.mu.Unlock() +} + +// lockGuard is a guard for futureState. +type lockGuard struct { + locker *futureState + target state +} + +// Unlock unlocks the state. +func (lg *lockGuard) Unlock() { + lg.locker.inner = lg.target + lg.locker.mu.Unlock() }