Add comments for concurrency package (#16654)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/16624/head
yah01 2022-04-26 11:33:45 +08:00 committed by GitHub
parent 955b9a31d5
commit 80ae6de323
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 4 deletions

View File

@ -322,7 +322,7 @@ func (loader *segmentLoader) loadSealedFields(segment *Segment, fields []*datapb
futures = append(futures, future) futures = append(futures, future)
} }
return concurrency.AwaitAll(futures) return concurrency.AwaitAll(futures...)
} }
func (loader *segmentLoader) loadGrowingFields(segment *Segment, fieldBinlogs []*datapb.FieldBinlog) error { func (loader *segmentLoader) loadGrowingFields(segment *Segment, fieldBinlogs []*datapb.FieldBinlog) error {
@ -444,7 +444,7 @@ func (loader *segmentLoader) loadFieldIndexData(segment *Segment, indexInfo *que
} }
} }
err := concurrency.AwaitAll(futures) err := concurrency.AwaitAll(futures...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,5 +1,24 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency package concurrency
// Future is a result type of async-await style.
// It contains the result (or error) of an async task.
// Trying to obtain the result (or error) blocks until the async task completes.
type Future struct { type Future struct {
ch chan struct{} ch chan struct{}
value interface{} value interface{}
@ -12,35 +31,48 @@ func newFuture() *Future {
} }
} }
// Return the result and error of the async task.
func (future *Future) Await() (interface{}, error) { func (future *Future) Await() (interface{}, error) {
<-future.ch <-future.ch
return future.value, future.err return future.value, future.err
} }
// Return the result of the async task,
// nil if no result or error occurred.
func (future *Future) Value() interface{} { func (future *Future) Value() interface{} {
<-future.ch <-future.ch
return future.value return future.value
} }
// True if error occurred,
// false otherwise.
func (future *Future) OK() bool { func (future *Future) OK() bool {
<-future.ch <-future.ch
return future.err == nil return future.err == nil
} }
// Return the error of the async task,
// nil if no error.
func (future *Future) Err() error { func (future *Future) Err() error {
<-future.ch <-future.ch
return future.err return future.err
} }
// Return a read-only channel,
// which will be closed if the async task completes.
// Use this if you need to wait the async task in a select statement.
func (future *Future) Inner() <-chan struct{} { func (future *Future) Inner() <-chan struct{} {
return future.ch return future.ch
} }
func AwaitAll(futures []*Future) error { // Await for multiple futures,
// Return nil if no future returns error,
// or return the first error in these futures.
func AwaitAll(futures ...*Future) error {
for i := range futures { for i := range futures {
if !futures[i].OK() { if !futures[i].OK() {
return futures[i].err return futures[i].err

View File

@ -1,11 +1,30 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency package concurrency
import "github.com/panjf2000/ants/v2" import "github.com/panjf2000/ants/v2"
// A goroutine pool
type Pool struct { type Pool struct {
inner *ants.Pool inner *ants.Pool
} }
// Return error if provides invalid parameters
// cap: the number of workers
func NewPool(cap int, opts ...ants.Option) (*Pool, error) { func NewPool(cap int, opts ...ants.Option) (*Pool, error) {
pool, err := ants.NewPool(cap, opts...) pool, err := ants.NewPool(cap, opts...)
if err != nil { if err != nil {
@ -17,6 +36,9 @@ func NewPool(cap int, opts ...ants.Option) (*Pool, error) {
}, nil }, nil
} }
// Submit a task into the pool,
// executes it asynchronously.
// This will block if the pool has finite workers and no idle worker.
func (pool *Pool) Submit(method func() (interface{}, error)) *Future { func (pool *Pool) Submit(method func() (interface{}, error)) *Future {
future := newFuture() future := newFuture()
err := pool.inner.Submit(func() { err := pool.inner.Submit(func() {
@ -36,10 +58,12 @@ func (pool *Pool) Submit(method func() (interface{}, error)) *Future {
return future return future
} }
// The number of workers
func (pool *Pool) Cap() int { func (pool *Pool) Cap() int {
return pool.inner.Cap() return pool.inner.Cap()
} }
// The number of running workers
func (pool *Pool) Running() int { func (pool *Pool) Running() int {
return pool.inner.Running() return pool.inner.Running()
} }

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency package concurrency
import ( import (
@ -24,7 +40,7 @@ func TestPool(t *testing.T) {
} }
assert.Greater(t, pool.Running(), 0) assert.Greater(t, pool.Running(), 0)
AwaitAll(futures) AwaitAll(futures...)
for i, future := range futures { for i, future := range futures {
res, err := future.Await() res, err := future.Await()
assert.NoError(t, err) assert.NoError(t, err)