Cancel local timestamp caching logic (#5327)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/5334/head^2
zhenshan.cao 2021-05-21 13:11:21 +08:00 committed by GitHub
parent ac0b878531
commit 70241a8bf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 83 additions and 203 deletions

4
go.mod
View File

@ -6,10 +6,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/antonmedv/expr v1.8.9
github.com/apache/pulsar-client-go v0.4.0
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
@ -26,6 +23,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/quasilyte/go-ruleguard v0.2.1 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.3.0

6
go.sum
View File

@ -316,6 +316,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo=
github.com/quasilyte/go-ruleguard v0.2.1/go.mod h1:hN2rVc/uS4bQhQKTio2XaSJSafJwqBUWWwtssT3cQmc=
github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@ -376,6 +378,7 @@ github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhD
github.com/yahoo/athenz v1.9.16 h1:2s8KtIxwAbcJIYySsfrT/t/WO0Ss5O7BPGUN/q8x2bg=
github.com/yahoo/athenz v1.9.16/go.mod h1:guj+0Ut6F33wj+OcSRlw69O0itsR7tVocv15F2wJnIo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
@ -449,6 +452,7 @@ golang.org/x/net v0.0.0-20190921015927-1a5e07d1ff72/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
@ -464,6 +468,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -527,6 +532,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -51,12 +51,6 @@ type IDRequest struct {
count uint32
}
type TSORequest struct {
BaseRequest
timestamp Timestamp
count uint32
}
type SyncRequest struct {
BaseRequest
}

View File

@ -1,182 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package allocator
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type Timestamp = typeutil.Timestamp
const (
tsCountPerRPC = 2 << 15
)
type TimestampAllocator struct {
Allocator
masterAddress string
masterClient types.MasterService
countPerRPC uint32
lastTsBegin Timestamp
lastTsEnd Timestamp
PeerID UniqueID
}
func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error) {
ctx1, cancel := context.WithCancel(ctx)
a := &TimestampAllocator{
Allocator: Allocator{
Ctx: ctx1,
CancelFunc: cancel,
Role: "TimestampAllocator",
},
masterAddress: masterAddr,
countPerRPC: tsCountPerRPC,
}
a.TChan = &Ticker{
UpdateInterval: time.Second,
}
a.Allocator.SyncFunc = a.syncTs
a.Allocator.ProcessFunc = a.processFunc
a.Allocator.CheckSyncFunc = a.checkSyncFunc
a.Allocator.PickCanDoFunc = a.pickCanDoFunc
a.Init()
return a, nil
}
func (ta *TimestampAllocator) Start() error {
var err error
ta.masterClient, err = msc.NewClient(ta.masterAddress, 20*time.Second)
if err != nil {
panic(err)
}
if err = ta.masterClient.Init(); err != nil {
panic(err)
}
if err = ta.masterClient.Start(); err != nil {
panic(err)
}
return ta.Allocator.Start()
}
func (ta *TimestampAllocator) checkSyncFunc(timeout bool) bool {
return timeout || len(ta.ToDoReqs) > 0
}
func (ta *TimestampAllocator) pickCanDoFunc() {
total := uint32(ta.lastTsEnd - ta.lastTsBegin)
need := uint32(0)
idx := 0
for _, req := range ta.ToDoReqs {
tReq := req.(*TSORequest)
need += tReq.count
if need <= total {
ta.CanDoReqs = append(ta.CanDoReqs, req)
idx++
} else {
break
}
}
ta.ToDoReqs = ta.ToDoReqs[idx:]
log.Debug("TimestampAllocator pickCanDoFunc",
zap.Any("need", need),
zap.Any("total", total),
zap.Any("remainReqCnt", len(ta.ToDoReqs)))
}
func (ta *TimestampAllocator) gatherReqTsCount() uint32 {
need := uint32(0)
for _, req := range ta.ToDoReqs {
tReq := req.(*TSORequest)
need += tReq.count
}
return need
}
func (ta *TimestampAllocator) syncTs() (bool, error) {
need := ta.gatherReqTsCount()
if need < ta.countPerRPC {
need = ta.countPerRPC
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
req := &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: 0,
Timestamp: 0,
SourceID: ta.PeerID,
},
Count: need,
}
resp, err := ta.masterClient.AllocTimestamp(ctx, req)
defer cancel()
if err != nil {
return false, fmt.Errorf("syncTimestamp Failed:%w", err)
}
ta.lastTsBegin = resp.GetTimestamp()
ta.lastTsEnd = ta.lastTsBegin + uint64(resp.GetCount())
return true, nil
}
func (ta *TimestampAllocator) processFunc(req Request) error {
tsoRequest := req.(*TSORequest)
tsoRequest.timestamp = ta.lastTsBegin
ta.lastTsBegin++
return nil
}
func (ta *TimestampAllocator) AllocOne() (Timestamp, error) {
ret, err := ta.Alloc(1)
if err != nil {
return 0, err
}
return ret[0], nil
}
func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) {
req := &TSORequest{
BaseRequest: BaseRequest{Done: make(chan error), Valid: false},
}
req.count = count
ta.Reqs <- req
if err := req.Wait(); err != nil {
return nil, fmt.Errorf("alloc time stamp request failed: %s", err)
}
start, count := req.timestamp, req.count
var ret []Timestamp
for i := uint32(0); i < count; i++ {
ret = append(ret, start+uint64(i))
}
return ret, nil
}
func (ta *TimestampAllocator) ClearCache() {
}

View File

@ -56,7 +56,7 @@ type ProxyNode struct {
tick *timeTick
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
tsoAllocator *TimestampAllocator
segAssigner *SegIDAssigner
queryMsgStream msgstream.MsgStream
@ -178,12 +178,11 @@ func (node *ProxyNode) Init() error {
node.idAllocator = idAllocator
node.idAllocator.PeerID = Params.ProxyID
tsoAllocator, err := allocator.NewTimestampAllocator(node.ctx, masterAddr)
tsoAllocator, err := NewTimestampAllocator(node.masterService, Params.ProxyID)
if err != nil {
return err
}
node.tsoAllocator = tsoAllocator
node.tsoAllocator.PeerID = Params.ProxyID
segAssigner, err := NewSegIDAssigner(node.ctx, node.dataService, node.lastTick)
if err != nil {
@ -221,9 +220,6 @@ func (node *ProxyNode) Start() error {
node.idAllocator.Start()
log.Debug("start id allocator ...")
node.tsoAllocator.Start()
log.Debug("start tso allocator ...")
node.segAssigner.Start()
log.Debug("start seg assigner ...")
@ -247,7 +243,6 @@ func (node *ProxyNode) Stop() error {
node.cancel()
globalInsertChannelsMap.CloseAllMsgStream()
node.tsoAllocator.Close()
node.idAllocator.Close()
node.segAssigner.Close()
node.sched.Close()

View File

@ -259,7 +259,7 @@ type TaskScheduler struct {
DqQueue TaskQueue
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
tsoAllocator *TimestampAllocator
wg sync.WaitGroup
ctx context.Context
@ -270,7 +270,7 @@ type TaskScheduler struct {
func NewTaskScheduler(ctx context.Context,
idAllocator *allocator.IDAllocator,
tsoAllocator *allocator.TimestampAllocator,
tsoAllocator *TimestampAllocator,
factory msgstream.Factory) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &TaskScheduler{

View File

@ -0,0 +1,70 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package proxynode
import (
"context"
"fmt"
"time"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
"github.com/milvus-io/milvus/internal/types"
)
type TimestampAllocator struct {
masterService types.MasterService
peerID UniqueID
}
func NewTimestampAllocator(master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) {
a := &TimestampAllocator{
peerID: peerID,
masterService: master,
}
return a, nil
}
func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
req := &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: 0,
Timestamp: 0,
SourceID: ta.peerID,
},
Count: count,
}
resp, err := ta.masterService.AllocTimestamp(ctx, req)
defer cancel()
if err != nil {
return nil, fmt.Errorf("syncTimestamp Failed:%w", err)
}
start, cnt := resp.Timestamp, resp.Count
var ret []Timestamp
for i := uint32(0); i < cnt; i++ {
ret = append(ret, start+uint64(i))
}
return ret, nil
}
func (ta *TimestampAllocator) AllocOne() (Timestamp, error) {
ret, err := ta.Alloc(1)
if err != nil {
return 0, err
}
return ret[0], nil
}

View File

@ -19,7 +19,6 @@ import (
"go.uber.org/zap"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -35,7 +34,7 @@ type timeTick struct {
pulsarProducer pulsar.Producer
tsoAllocator *allocator.TimestampAllocator
tsoAllocator *TimestampAllocator
tickMsgStream msgstream.MsgStream
msFactory msgstream.Factory
@ -49,7 +48,7 @@ type timeTick struct {
}
func newTimeTick(ctx context.Context,
tsoAllocator *allocator.TimestampAllocator,
tsoAllocator *TimestampAllocator,
interval time.Duration,
checkFunc tickCheckFunc,
factory msgstream.Factory) *timeTick {