mirror of https://github.com/milvus-io/milvus.git
parent
6a11bcf499
commit
8b1ae98fa9
|
@ -18,11 +18,9 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -32,12 +30,16 @@ const (
|
|||
|
||||
type UniqueID = typeutil.UniqueID
|
||||
|
||||
type idAllocatorInterface interface {
|
||||
AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error)
|
||||
}
|
||||
|
||||
type IDAllocator struct {
|
||||
Allocator
|
||||
|
||||
etcdEndpoints []string
|
||||
metaRoot string
|
||||
rootCoordClient types.RootCoord
|
||||
etcdEndpoints []string
|
||||
metaRoot string
|
||||
idAllocator idAllocatorInterface
|
||||
|
||||
countPerRPC uint32
|
||||
|
||||
|
@ -47,7 +49,7 @@ type IDAllocator struct {
|
|||
PeerID UniqueID
|
||||
}
|
||||
|
||||
func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string) (*IDAllocator, error) {
|
||||
func NewIDAllocator(ctx context.Context, idAlloctor idAllocatorInterface, peerID UniqueID) (*IDAllocator, error) {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
a := &IDAllocator{
|
||||
Allocator: Allocator{
|
||||
|
@ -55,9 +57,9 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string
|
|||
CancelFunc: cancel,
|
||||
Role: "IDAllocator",
|
||||
},
|
||||
countPerRPC: IDCountPerRPC,
|
||||
metaRoot: metaRoot,
|
||||
etcdEndpoints: etcdEndpoints,
|
||||
countPerRPC: IDCountPerRPC,
|
||||
idAllocator: idAlloctor,
|
||||
PeerID: peerID,
|
||||
}
|
||||
a.TChan = &EmptyTicker{}
|
||||
a.Allocator.SyncFunc = a.syncID
|
||||
|
@ -69,20 +71,6 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string
|
|||
}
|
||||
|
||||
func (ia *IDAllocator) Start() error {
|
||||
var err error
|
||||
|
||||
ia.rootCoordClient, err = rcc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = ia.rootCoordClient.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = ia.rootCoordClient.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ia.Allocator.Start()
|
||||
}
|
||||
|
||||
|
@ -112,7 +100,7 @@ func (ia *IDAllocator) syncID() (bool, error) {
|
|||
},
|
||||
Count: need,
|
||||
}
|
||||
resp, err := ia.rootCoordClient.AllocID(ctx, req)
|
||||
resp, err := ia.idAllocator.AllocID(ctx, req)
|
||||
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package allocator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type mockIDAllocator struct {
|
||||
}
|
||||
|
||||
func (tso *mockIDAllocator) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
|
||||
return &rootcoordpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
ID: int64(1),
|
||||
Count: req.Count,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newMockIDAllocator() *mockIDAllocator {
|
||||
return &mockIDAllocator{}
|
||||
}
|
||||
|
||||
func TestIDAllocator(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mockIDAllocator := newMockIDAllocator()
|
||||
|
||||
idAllocator, err := NewIDAllocator(ctx, mockIDAllocator, int64(1))
|
||||
assert.Nil(t, err)
|
||||
err = idAllocator.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
idStart, idEnd, err := idAllocator.Alloc(20000)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idStart, int64(1))
|
||||
assert.Equal(t, idEnd, int64(20001))
|
||||
|
||||
id, err := idAllocator.AllocOne()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, id, int64(20001))
|
||||
|
||||
id, err = idAllocator.AllocOne()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, id, int64(20002))
|
||||
}
|
|
@ -163,13 +163,12 @@ func (node *Proxy) Init() error {
|
|||
return err
|
||||
}
|
||||
|
||||
idAllocator, err := allocator.NewIDAllocator(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
|
||||
idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, Params.ProxyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
node.idAllocator = idAllocator
|
||||
node.idAllocator.PeerID = Params.ProxyID
|
||||
|
||||
tsoAllocator, err := NewTimestampAllocator(node.ctx, node.rootCoord, Params.ProxyID)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue