milvus/internal/allocator/allocator.go

279 lines
5.1 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package allocator
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
const (
maxConcurrentRequests = 10000
)
type Request interface {
Wait() error
Notify(error)
}
type BaseRequest struct {
Done chan error
Valid bool
}
func (req *BaseRequest) Wait() error {
err := <-req.Done
return err
}
func (req *BaseRequest) Notify(err error) {
req.Done <- err
}
type IDRequest struct {
BaseRequest
id UniqueID
count uint32
}
type SyncRequest struct {
BaseRequest
}
type TickerChan interface {
Chan() <-chan time.Time
Close()
Init()
Reset()
}
type EmptyTicker struct {
tChan <-chan time.Time
}
func (t *EmptyTicker) Chan() <-chan time.Time {
return t.tChan
}
func (t *EmptyTicker) Init() {
}
func (t *EmptyTicker) Reset() {
}
func (t *EmptyTicker) Close() {
}
type Ticker struct {
ticker *time.Ticker
UpdateInterval time.Duration
}
func (t *Ticker) Init() {
t.ticker = time.NewTicker(t.UpdateInterval)
}
func (t *Ticker) Reset() {
t.ticker.Reset(t.UpdateInterval)
}
func (t *Ticker) Close() {
t.ticker.Stop()
}
func (t *Ticker) Chan() <-chan time.Time {
return t.ticker.C
}
type Allocator struct {
Ctx context.Context
CancelFunc context.CancelFunc
wg sync.WaitGroup
Reqs chan Request
ToDoReqs []Request
CanDoReqs []Request
SyncReqs []Request
TChan TickerChan
ForceSyncChan chan Request
SyncFunc func() (bool, error)
ProcessFunc func(req Request) error
CheckSyncFunc func(timeout bool) bool
PickCanDoFunc func()
SyncErr error
Role string
}
func (ta *Allocator) Start() error {
ta.TChan.Init()
ta.wg.Add(1)
go ta.mainLoop()
return nil
}
func (ta *Allocator) Init() {
ta.ForceSyncChan = make(chan Request, maxConcurrentRequests)
ta.Reqs = make(chan Request, maxConcurrentRequests)
}
func (ta *Allocator) mainLoop() {
defer ta.wg.Done()
loopCtx, loopCancel := context.WithCancel(ta.Ctx)
defer loopCancel()
for {
select {
case first := <-ta.ForceSyncChan:
ta.SyncReqs = append(ta.SyncReqs, first)
pending := len(ta.ForceSyncChan)
for i := 0; i < pending; i++ {
ta.SyncReqs = append(ta.SyncReqs, <-ta.ForceSyncChan)
}
ta.sync(true)
ta.finishSyncRequest()
case <-ta.TChan.Chan():
ta.pickCanDo()
ta.finishRequest()
if ta.sync(true) {
ta.pickCanDo()
ta.finishRequest()
}
ta.failRemainRequest()
case first := <-ta.Reqs:
ta.ToDoReqs = append(ta.ToDoReqs, first)
pending := len(ta.Reqs)
for i := 0; i < pending; i++ {
ta.ToDoReqs = append(ta.ToDoReqs, <-ta.Reqs)
}
ta.pickCanDo()
ta.finishRequest()
if ta.sync(false) {
ta.pickCanDo()
ta.finishRequest()
}
ta.failRemainRequest()
case <-loopCtx.Done():
return
}
}
}
func (ta *Allocator) pickCanDo() {
if ta.PickCanDoFunc == nil {
return
}
ta.PickCanDoFunc()
}
func (ta *Allocator) sync(timeout bool) bool {
if ta.SyncFunc == nil || ta.CheckSyncFunc == nil {
ta.CanDoReqs = ta.ToDoReqs
ta.ToDoReqs = nil
return true
}
if !timeout && len(ta.ToDoReqs) == 0 {
return false
}
if !ta.CheckSyncFunc(timeout) {
return false
}
var ret bool
ret, ta.SyncErr = ta.SyncFunc()
if !timeout {
ta.TChan.Reset()
}
return ret
}
func (ta *Allocator) finishSyncRequest() {
for _, req := range ta.SyncReqs {
if req != nil {
req.Notify(nil)
}
}
ta.SyncReqs = nil
}
func (ta *Allocator) failRemainRequest() {
var err error
if ta.SyncErr != nil {
err = fmt.Errorf("%s failRemainRequest err:%w", ta.Role, ta.SyncErr)
} else {
errMsg := fmt.Sprintf("%s failRemainRequest unexpected error", ta.Role)
err = errors.New(errMsg)
}
if len(ta.ToDoReqs) > 0 {
log.Debug("Allocator has some reqs to fail",
zap.Any("Role", ta.Role),
zap.Any("reqLen", len(ta.ToDoReqs)))
}
for _, req := range ta.ToDoReqs {
if req != nil {
req.Notify(err)
}
}
ta.ToDoReqs = nil
}
func (ta *Allocator) finishRequest() {
for _, req := range ta.CanDoReqs {
if req != nil {
err := ta.ProcessFunc(req)
req.Notify(err)
}
}
ta.CanDoReqs = []Request{}
}
func (ta *Allocator) revokeRequest(err error) {
n := len(ta.Reqs)
for i := 0; i < n; i++ {
req := <-ta.Reqs
req.Notify(err)
}
}
func (ta *Allocator) Close() {
ta.CancelFunc()
ta.wg.Wait()
ta.TChan.Close()
errMsg := fmt.Sprintf("%s is closing", ta.Role)
ta.revokeRequest(errors.New(errMsg))
}
func (ta *Allocator) CleanCache() {
req := &SyncRequest{
BaseRequest: BaseRequest{
Done: make(chan error),
Valid: false,
},
}
ta.ForceSyncChan <- req
_ = req.Wait()
}