Add grpc check logic for master

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2020-11-13 16:53:55 +08:00 committed by yefu.chen
parent 83e37d6e36
commit 595d827155
14 changed files with 445 additions and 519 deletions

1
.gitignore vendored
View File

@ -51,6 +51,7 @@ cmake_build/
*.log
.coverage
*.pyc
*.log
.DS_Store
*.swp

View File

@ -49,6 +49,8 @@ type Master struct {
// chans
ssChan chan internalpb.SegmentStats
grpcErr chan error
kvBase *kv.EtcdKV
scheduler *ddRequestScheduler
mt *metaTable
@ -94,6 +96,7 @@ func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootP
scheduler: NewDDRequestScheduler(),
mt: metakv,
ssChan: make(chan internalpb.SegmentStats, 10),
grpcErr: make(chan error),
pc: informer.NewPulsarClient(),
}
m.grpcServer = grpc.NewServer()
@ -106,18 +109,6 @@ func (s *Master) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}
func (s *Master) startServer(ctx context.Context) error {
// Run callbacks
for _, cb := range s.startCallbacks {
cb()
}
// Server has started.
atomic.StoreInt64(&s.isServing, 1)
return nil
}
// AddCloseCallback adds a callback in the Close phase.
func (s *Master) AddCloseCallback(callbacks ...func()) {
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
@ -154,11 +145,15 @@ func (s *Master) IsClosed() bool {
// Run runs the pd server.
func (s *Master) Run(grpcPort int64) error {
if err := s.startServer(s.ctx); err != nil {
if err := s.startServerLoop(s.ctx, grpcPort); err != nil {
return err
}
atomic.StoreInt64(&s.isServing, 1)
s.startServerLoop(s.ctx, grpcPort)
// Run callbacks
for _, cb := range s.startCallbacks {
cb()
}
return nil
}
@ -173,22 +168,23 @@ func (s *Master) LoopContext() context.Context {
return s.serverLoopCtx
}
func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) {
func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
//go s.Se
s.serverLoopWg.Add(1)
go s.grpcLoop(grpcPort)
//s.serverLoopWg.Add(1)
//go s.pulsarLoop()
if err := <-s.grpcErr; err != nil {
return err
}
s.serverLoopWg.Add(1)
go s.tasksExecutionLoop()
s.serverLoopWg.Add(1)
go s.segmentStatisticsLoop()
return nil
}
func (s *Master) stopServerLoop() {
@ -205,6 +201,15 @@ func (s *Master) StartTimestamp() int64 {
return s.startTimestamp
}
func (s *Master) checkGrpcReady(ctx context.Context, targetCh chan error) {
select {
case <-time.After(100 * time.Millisecond):
targetCh <- nil
case <-ctx.Done():
return
}
}
func (s *Master) grpcLoop(grpcPort int64) {
defer s.serverLoopWg.Done()
@ -213,11 +218,14 @@ func (s *Master) grpcLoop(grpcPort int64) {
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
log.Printf("failed to listen: %v", err)
s.grpcErr <- err
return
}
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
go s.checkGrpcReady(ctx, s.grpcErr)
if err := s.grpcServer.Serve(lis); err != nil {
panic("grpcServer Start Failed!!")
s.grpcErr <- err
}
}

View File

@ -6,25 +6,21 @@ import (
"log"
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/kv/mockkv"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
)
"google.golang.org/grpc"
const (
MOCKGRPCPORT = ":0"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
)
var GrpcServerAddr net.Addr
// Server is the pd server.
type Master struct {
isServing int64
@ -42,6 +38,9 @@ type Master struct {
kvBase kv.Base
// error chans
grpcErr chan error
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -49,15 +48,19 @@ type Master struct {
grpcAddr net.Addr
}
func Init() {
rand.Seed(time.Now().UnixNano())
id.Init()
tso.Init()
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context) (*Master, error) {
rand.Seed(time.Now().UnixNano())
id.InitGlobalIDAllocator("idTimestamp", mockkv.NewEtcdKV())
Init()
m := &Master{
ctx: ctx,
kvBase: mockkv.NewEtcdKV(),
tsoAllocator: tso.NewGlobalTSOAllocator("timestamp", mockkv.NewEtcdKV()),
ctx: ctx,
kvBase: mockkv.NewEtcdKV(),
grpcErr: make(chan error),
}
m.grpcServer = grpc.NewServer()
@ -81,9 +84,6 @@ func (s *Master) startServer(ctx context.Context) error {
for _, cb := range s.startCallbacks {
cb()
}
// Server has started.
atomic.StoreInt64(&s.isServing, 1)
return nil
}
@ -115,6 +115,11 @@ func (s *Master) Close() {
log.Print("close server")
}
// IsClosed checks whether server is closed or not.
func (s *Master) IsServing() bool {
return !s.IsClosed()
}
// IsClosed checks whether server is closed or not.
func (s *Master) IsClosed() bool {
return atomic.LoadInt64(&s.isServing) == 0
@ -128,7 +133,14 @@ func (s *Master) Run() error {
}
s.startServerLoop(s.ctx)
// Server has started.
if err := <-s.grpcErr; err != nil {
s.Close()
return err
}
atomic.StoreInt64(&s.isServing, 1)
return nil
}
@ -144,9 +156,11 @@ func (s *Master) LoopContext() context.Context {
func (s *Master) startServerLoop(ctx context.Context) {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
s.serverLoopWg.Add(3)
s.serverLoopWg.Add(1)
go s.grpcLoop()
s.serverLoopWg.Add(1)
go s.pulsarLoop()
s.serverLoopWg.Add(1)
go s.segmentStatisticsLoop()
}
@ -158,20 +172,34 @@ func (s *Master) stopServerLoop() {
s.serverLoopWg.Wait()
}
func (s *Master) grpcLoop() {
defer s.serverLoopWg.Done()
lis, err := net.Listen("tcp", MOCKGRPCPORT)
if err != nil {
log.Printf("failed to listen: %v", err)
func (s *Master) checkReady(ctx context.Context, targetCh chan error) {
select {
case <-time.After(100 * time.Millisecond):
targetCh <- nil
case <-ctx.Done():
return
}
}
func (s *Master) grpcLoop() {
defer s.serverLoopWg.Done()
masterAddr := conf.Config.Etcd.Address
masterAddr += ":"
masterAddr += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", masterAddr)
if err != nil {
log.Printf("failed to listen: %v", err)
s.grpcErr <- err
return
}
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
go s.checkReady(ctx, s.grpcErr)
s.grpcAddr = lis.Addr()
fmt.Printf("Start MockMaster grpc server , addr:%v\n", s.grpcAddr)
if err := s.grpcServer.Serve(lis); err != nil {
panic("grpcServer Startup Failed!")
fmt.Println(err)
s.grpcErr <- err
}
}

View File

@ -12,79 +12,38 @@ package reader
*/
import "C"
import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)
type container interface {
// collection
getCollectionNum() int
addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
getCollectionByName(collectionName string) (*Collection, error)
// partition
// Partition tags in different collections are not unique,
// so partition api should specify the target collection.
addPartition(collectionID UniqueID, partitionTag string) error
removePartition(collectionID UniqueID, partitionTag string) error
getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error)
// segment
getSegmentNum() int
getSegmentStatistics() *internalpb.QueryNodeSegStats
addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error
removeSegment(segmentID UniqueID) error
getSegmentByID(segmentID UniqueID) (*Segment, error)
hasSegment(segmentID UniqueID) bool
}
// TODO: rename
type colSegContainer struct {
mu sync.RWMutex
type ColSegContainer struct {
collections []*Collection
segments map[UniqueID]*Segment
}
//----------------------------------------------------------------------------------------------------- collection
func (container *colSegContainer) getCollectionNum() int {
container.mu.RLock()
defer container.mu.RUnlock()
return len(container.collections)
}
func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) error {
container.mu.Lock()
defer container.mu.Unlock()
func (container *ColSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) *Collection {
var newCollection = newCollection(collMeta, collMetaBlob)
container.collections = append(container.collections, newCollection)
return nil
return newCollection
}
func (container *colSegContainer) removeCollection(collectionID UniqueID) error {
collection, err := container.getCollectionByID(collectionID)
container.mu.Lock()
defer container.mu.Unlock()
if err != nil {
return err
func (container *ColSegContainer) removeCollection(collection *Collection) error {
if collection == nil {
return errors.New("null collection")
}
deleteCollection(collection)
collectionID := collection.ID()
tmpCollections := make([]*Collection, 0)
for _, col := range container.collections {
if col.ID() == collectionID {
for _, p := range *col.Partitions() {
for _, p := range *collection.Partitions() {
for _, s := range *p.Segments() {
delete(container.segments, s.ID())
}
@ -98,10 +57,7 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error
return nil
}
func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Collection, error) {
container.mu.RLock()
defer container.mu.RUnlock()
func (container *ColSegContainer) getCollectionByID(collectionID int64) (*Collection, error) {
for _, collection := range container.collections {
if collection.ID() == collectionID {
return collection, nil
@ -111,10 +67,7 @@ func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Col
return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
}
func (container *colSegContainer) getCollectionByName(collectionName string) (*Collection, error) {
container.mu.RLock()
defer container.mu.RUnlock()
func (container *ColSegContainer) getCollectionByName(collectionName string) (*Collection, error) {
for _, collection := range container.collections {
if collection.Name() == collectionName {
return collection, nil
@ -125,55 +78,60 @@ func (container *colSegContainer) getCollectionByName(collectionName string) (*C
}
//----------------------------------------------------------------------------------------------------- partition
func (container *colSegContainer) addPartition(collectionID UniqueID, partitionTag string) error {
collection, err := container.getCollectionByID(collectionID)
if err != nil {
return err
func (container *ColSegContainer) addPartition(collection *Collection, partitionTag string) (*Partition, error) {
if collection == nil {
return nil, errors.New("null collection")
}
container.mu.Lock()
defer container.mu.Unlock()
var newPartition = newPartition(partitionTag)
*collection.Partitions() = append(*collection.Partitions(), newPartition)
return nil
}
func (container *colSegContainer) removePartition(collectionID UniqueID, partitionTag string) error {
collection, err := container.getCollectionByID(collectionID)
if err != nil {
return err
}
container.mu.Lock()
defer container.mu.Unlock()
var tmpPartitions = make([]*Partition, 0)
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
for _, s := range *p.Segments() {
delete(container.segments, s.ID())
}
} else {
tmpPartitions = append(tmpPartitions, p)
for _, col := range container.collections {
if col.Name() == collection.Name() {
*col.Partitions() = append(*col.Partitions(), newPartition)
return newPartition, nil
}
}
*collection.Partitions() = tmpPartitions
return nil
return nil, errors.New("cannot find collection, name = " + collection.Name())
}
func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
collection, err := container.getCollectionByID(collectionID)
func (container *ColSegContainer) removePartition(partition *Partition) error {
if partition == nil {
return errors.New("null partition")
}
var targetCollection *Collection
var tmpPartitions = make([]*Partition, 0)
var hasPartition = false
for _, col := range container.collections {
for _, p := range *col.Partitions() {
if p.Tag() == partition.partitionTag {
targetCollection = col
hasPartition = true
for _, s := range *p.Segments() {
delete(container.segments, s.ID())
}
} else {
tmpPartitions = append(tmpPartitions, p)
}
}
}
if hasPartition && targetCollection != nil {
*targetCollection.Partitions() = tmpPartitions
return nil
}
return errors.New("cannot found partition, tag = " + partition.Tag())
}
func (container *ColSegContainer) getPartitionByTag(collectionName string, partitionTag string) (*Partition, error) {
targetCollection, err := container.getCollectionByName(collectionName)
if err != nil {
return nil, err
}
container.mu.RLock()
defer container.mu.RUnlock()
for _, p := range *collection.Partitions() {
for _, p := range *targetCollection.Partitions() {
if p.Tag() == partitionTag {
return p, nil
}
@ -183,90 +141,60 @@ func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, parti
}
//----------------------------------------------------------------------------------------------------- segment
func (container *colSegContainer) getSegmentNum() int {
container.mu.RLock()
defer container.mu.RUnlock()
return len(container.segments)
}
func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSegStats {
var statisticData = make([]*internalpb.SegmentStats, 0)
for segmentID, segment := range container.segments {
currentMemSize := segment.getMemSize()
segment.lastMemSize = currentMemSize
segmentNumOfRows := segment.getRowCount()
stat := internalpb.SegmentStats{
SegmentID: segmentID,
MemorySize: currentMemSize,
NumRows: segmentNumOfRows,
RecentlyModified: segment.recentlyModified,
}
statisticData = append(statisticData, &stat)
func (container *ColSegContainer) addSegment(collection *Collection, partition *Partition, segmentID int64) (*Segment, error) {
if collection == nil {
return nil, errors.New("null collection")
}
return &internalpb.QueryNodeSegStats{
MsgType: internalpb.MsgType_kQueryNodeSegStats,
SegStats: statisticData,
if partition == nil {
return nil, errors.New("null partition")
}
}
func (container *colSegContainer) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
collection, err := container.getCollectionByID(collectionID)
if err != nil {
return err
}
partition, err := container.getPartitionByTag(collectionID, partitionTag)
if err != nil {
return err
}
container.mu.Lock()
defer container.mu.Unlock()
var newSegment = newSegment(collection, segmentID)
container.segments[segmentID] = newSegment
*partition.Segments() = append(*partition.Segments(), newSegment)
return nil
}
func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
container.mu.Lock()
defer container.mu.Unlock()
var targetPartition *Partition
var segmentIndex = -1
for _, col := range container.collections {
for _, p := range *col.Partitions() {
for i, s := range *p.Segments() {
if s.ID() == segmentID {
targetPartition = p
segmentIndex = i
if col.ID() == collection.ID() {
for _, p := range *col.Partitions() {
if p.Tag() == partition.Tag() {
*p.Segments() = append(*p.Segments(), newSegment)
return newSegment, nil
}
}
}
}
delete(container.segments, segmentID)
if targetPartition != nil && segmentIndex > 0 {
targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...)
}
return nil
return nil, errors.New("cannot find collection or segment")
}
func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, error) {
container.mu.RLock()
defer container.mu.RUnlock()
func (container *ColSegContainer) removeSegment(segment *Segment) error {
var targetPartition *Partition
var tmpSegments = make([]*Segment, 0)
var hasSegment = false
for _, col := range container.collections {
for _, p := range *col.Partitions() {
for _, s := range *p.Segments() {
if s.ID() == segment.ID() {
targetPartition = p
hasSegment = true
delete(container.segments, segment.ID())
} else {
tmpSegments = append(tmpSegments, s)
}
}
}
}
if hasSegment && targetPartition != nil {
*targetPartition.Segments() = tmpSegments
return nil
}
return errors.New("cannot found segment, id = " + strconv.FormatInt(segment.ID(), 10))
}
func (container *ColSegContainer) getSegmentByID(segmentID int64) (*Segment, error) {
targetSegment, ok := container.segments[segmentID]
if !ok {
@ -276,10 +204,7 @@ func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment,
return targetSegment, nil
}
func (container *colSegContainer) hasSegment(segmentID UniqueID) bool {
container.mu.RLock()
defer container.mu.RUnlock()
func (container *ColSegContainer) hasSegment(segmentID int64) bool {
_, ok := container.segments[segmentID]
return ok

View File

@ -6,7 +6,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -18,125 +17,6 @@ func TestColSegContainer_addCollection(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
}
func TestColSegContainer_removeCollection(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.container).removeCollection(collectionID)
assert.NoError(t, err)
assert.Equal(t, (*node.container).getCollectionNum(), 0)
}
func TestColSegContainer_getCollectionByID(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -177,17 +57,121 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
}
targetCollection, err := (*node.container).getCollectionByID(UniqueID(0))
func TestColSegContainer_removeCollection(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, len(node.container.collections), 1)
err := node.container.removeCollection(collection)
assert.NoError(t, err)
assert.Equal(t, len(node.container.collections), 0)
}
func TestColSegContainer_getCollectionByID(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, len(node.container.collections), 1)
targetCollection, err := node.container.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
@ -199,7 +183,6 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -240,17 +223,13 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
targetCollection, err := (*node.container).getCollectionByName("collection0")
targetCollection, err := node.container.getCollectionByName("collection0")
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
@ -263,8 +242,6 @@ func TestColSegContainer_addPartition(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -295,7 +272,7 @@ func TestColSegContainer_addPartition(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
@ -305,22 +282,16 @@ func TestColSegContainer_addPartition(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, len(node.container.collections), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collectionID, tag)
targetPartition, err := node.container.addPartition(collection, tag)
assert.NoError(t, err)
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default")
assert.Equal(t, targetPartition.partitionTag, "default")
}
}
@ -329,9 +300,6 @@ func TestColSegContainer_removePartition(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
partitionTag := "default"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -362,33 +330,27 @@ func TestColSegContainer_removePartition(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{partitionTag},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, len(node.container.collections), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collectionID, tag)
targetPartition, err := node.container.addPartition(collection, tag)
assert.NoError(t, err)
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, partitionTag)
err = (*node.container).removePartition(collectionID, partitionTag)
assert.Equal(t, targetPartition.partitionTag, "default")
err = node.container.removePartition(targetPartition)
assert.NoError(t, err)
}
}
@ -398,8 +360,6 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -430,7 +390,7 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
@ -440,23 +400,20 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, len(node.container.collections), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collectionID, tag)
targetPartition, err := node.container.addPartition(collection, tag)
assert.NoError(t, err)
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
assert.Equal(t, targetPartition.partitionTag, "default")
partition, err := node.container.getPartitionByTag(collectionMeta.Schema.Name, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default")
assert.NotNil(t, partition)
assert.Equal(t, partition.partitionTag, "default")
}
}
@ -466,8 +423,6 @@ func TestColSegContainer_addSegment(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -498,7 +453,7 @@ func TestColSegContainer_addSegment(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
@ -508,24 +463,18 @@ func TestColSegContainer_addSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
}
@ -536,8 +485,6 @@ func TestColSegContainer_removeSegment(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -568,7 +515,7 @@ func TestColSegContainer_removeSegment(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
@ -578,27 +525,21 @@ func TestColSegContainer_removeSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
err = (*node.container).removeSegment(UniqueID(i))
err = node.container.removeSegment(targetSeg)
assert.NoError(t, err)
}
}
@ -608,8 +549,6 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -640,7 +579,7 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
@ -650,26 +589,23 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
seg, err := node.container.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(i))
}
}
@ -678,8 +614,6 @@ func TestColSegContainer_hasSegment(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -710,7 +644,7 @@ func TestColSegContainer_hasSegment(t *testing.T) {
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
@ -720,29 +654,23 @@ func TestColSegContainer_hasSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
hasSeg := (*node.container).hasSegment(UniqueID(i))
hasSeg := node.container.hasSegment(UniqueID(i))
assert.Equal(t, hasSeg, true)
hasSeg = (*node.container).hasSegment(UniqueID(i + 100))
hasSeg = node.container.hasSegment(UniqueID(i + 100))
assert.Equal(t, hasSeg, false)
}
}

View File

@ -16,7 +16,6 @@ func TestCollection_Partitions(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -40,7 +39,7 @@ func TestCollection_Partitions(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -57,18 +56,14 @@ func TestCollection_Partitions(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collection.ID(), tag)
_, err := node.container.addPartition(collection, tag)
assert.NoError(t, err)
}

View File

@ -51,7 +51,7 @@ func (dsService *dataSyncService) initNodes() {
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.pulsarURL)
var filterDmNode Node = newFilteredDmNode()
var insertNode Node = newInsertNode(dsService.node.container)
var insertNode Node = newInsertNode(&dsService.node.container.segments)
var serviceTimeNode Node = newServiceTimeNode(dsService.node)
dsService.fg.AddNode(&dmStreamNode)

View File

@ -9,12 +9,12 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
const ctxTimeInMillisecond = 2000
@ -38,7 +38,6 @@ func TestManipulationService_Start(t *testing.T) {
node := NewQueryNode(ctx, 0, pulsarURL)
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -62,7 +61,7 @@ func TestManipulationService_Start(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -79,21 +78,18 @@ func TestManipulationService_Start(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
targetSeg, err := node.container.addSegment(collection, partition, segmentID)
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, segmentID)
// test data generate
const msgLength = 10

View File

@ -1,8 +1,10 @@
package reader
import (
"errors"
"fmt"
"log"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -10,7 +12,7 @@ import (
type insertNode struct {
BaseNode
container *container
segmentsMap *map[int64]*Segment
}
type InsertData struct {
@ -60,7 +62,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// 2. do preInsert
for segmentID := range insertData.insertRecords {
var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
var targetSegment, err = iNode.getSegmentBySegmentID(segmentID)
if err != nil {
log.Println("preInsert failed")
// TODO: add error handling
@ -87,8 +89,18 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
return []*Msg{&res}
}
func (iNode *insertNode) getSegmentBySegmentID(segmentID int64) (*Segment, error) {
targetSegment, ok := (*iNode.segmentsMap)[segmentID]
if !ok {
return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
}
return targetSegment, nil
}
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
var targetSegment, err = iNode.getSegmentBySegmentID(segmentID)
if err != nil {
log.Println("cannot find segment:", segmentID)
// TODO: add error handling
@ -111,13 +123,13 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
wg.Done()
}
func newInsertNode(container *container) *insertNode {
func newInsertNode(segmentsMap *map[int64]*Segment) *insertNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &insertNode{
BaseNode: baseNode,
container: container,
BaseNode: baseNode,
segmentsMap: segmentsMap,
}
}

View File

@ -27,10 +27,10 @@ const (
type metaService struct {
ctx context.Context
kvBase *kv.EtcdKV
container *container
container *ColSegContainer
}
func newMetaService(ctx context.Context, container *container) *metaService {
func newMetaService(ctx context.Context, container *ColSegContainer) *metaService {
ETCDAddr := "http://"
ETCDAddr += conf.Config.Etcd.Address
ETCDPort := conf.Config.Etcd.Port
@ -143,12 +143,9 @@ func (mService *metaService) processCollectionCreate(id string, value string) {
col := mService.collectionUnmarshal(value)
if col != nil {
err := (*mService.container).addCollection(col, value)
if err != nil {
log.Println(err)
}
newCollection := mService.container.addCollection(col, value)
for _, partitionTag := range col.PartitionTags {
err = (*mService.container).addPartition(col.ID, partitionTag)
_, err := mService.container.addPartition(newCollection, partitionTag)
if err != nil {
log.Println(err)
}
@ -166,11 +163,25 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
// TODO: what if seg == nil? We need to notify master and return rpc request failed
if seg != nil {
err := (*mService.container).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID)
var col, err = mService.container.getCollectionByID(seg.CollectionID)
if err != nil {
log.Println(err)
return
}
if col != nil {
var partition, err = mService.container.getPartitionByTag(col.Name(), seg.PartitionTag)
if err != nil {
log.Println(err)
return
}
if partition != nil {
_, err = mService.container.addSegment(col, partition, seg.SegmentID)
if err != nil {
log.Println(err)
return
}
}
}
}
}
@ -195,7 +206,7 @@ func (mService *metaService) processSegmentModify(id string, value string) {
}
if seg != nil {
targetSegment, err := (*mService.container).getSegmentByID(seg.SegmentID)
targetSegment, err := mService.container.getSegmentByID(seg.SegmentID)
if err != nil {
log.Println(err)
return
@ -230,7 +241,13 @@ func (mService *metaService) processSegmentDelete(id string) {
log.Println("Cannot parse segment id:" + id)
}
err = (*mService.container).removeSegment(segmentID)
seg, err := mService.container.getSegmentByID(segmentID)
if err != nil {
log.Println(err)
return
}
err = mService.container.removeSegment(seg)
if err != nil {
log.Println(err)
return
@ -245,7 +262,13 @@ func (mService *metaService) processCollectionDelete(id string) {
log.Println("Cannot parse collection id:" + id)
}
err = (*mService.container).removeCollection(collectionID)
targetCollection, err := mService.container.getCollectionByID(collectionID)
if err != nil {
log.Println(err)
return
}
err = mService.container.removeCollection(targetCollection)
if err != nil {
log.Println(err)
return

View File

@ -16,7 +16,6 @@ func TestPartition_Segments(t *testing.T) {
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
@ -40,7 +39,7 @@ func TestPartition_Segments(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -57,17 +56,14 @@ func TestPartition_Segments(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, len(node.container.collections), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collection.ID(), tag)
_, err := node.container.addPartition(collection, tag)
assert.NoError(t, err)
}
@ -78,7 +74,7 @@ func TestPartition_Segments(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID())
_, err := node.container.addSegment(collection, targetPartition, UniqueID(i))
assert.NoError(t, err)
}

View File

@ -24,7 +24,7 @@ type QueryNode struct {
tSafe Timestamp
container *container
container *ColSegContainer
dataSyncService *dataSyncService
metaService *metaService
@ -36,11 +36,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
var container container = &colSegContainer{
collections: collections,
segments: segmentsMap,
}
return &QueryNode{
ctx: ctx,
@ -49,7 +44,10 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
tSafe: 0,
container: &container,
container: &ColSegContainer{
collections: collections,
segments: segmentsMap,
},
dataSyncService: nil,
metaService: nil,
@ -60,7 +58,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
func (node *QueryNode) Start() {
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
node.searchService = newSearchService(node.ctx, node.container, node.pulsarURL)
node.metaService = newMetaService(node.ctx, node.container)
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)

View File

@ -14,7 +14,7 @@ type searchService struct {
ctx context.Context
pulsarURL string
node *QueryNode
container *ColSegContainer
searchMsgStream *msgstream.MsgStream
searchResultMsgStream *msgstream.MsgStream
@ -33,13 +33,13 @@ type SearchResult struct {
ResultDistances []float32
}
func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *searchService {
func newSearchService(ctx context.Context, container *ColSegContainer, pulsarURL string) *searchService {
return &searchService{
ctx: ctx,
pulsarURL: pulsarURL,
node: node,
container: container,
searchMsgStream: nil,
searchResultMsgStream: nil,

View File

@ -14,10 +14,10 @@ import (
type statsService struct {
ctx context.Context
msgStream *msgstream.PulsarMsgStream
container *container
container *ColSegContainer
}
func newStatsService(ctx context.Context, container *container, pulsarAddress string) *statsService {
func newStatsService(ctx context.Context, container *ColSegContainer, pulsarAddress string) *statsService {
// TODO: add pulsar message stream init
return &statsService{
@ -41,13 +41,29 @@ func (sService *statsService) start() {
}
func (sService *statsService) sendSegmentStatistic() {
var statisticData = (*sService.container).getSegmentStatistics()
var statisticData = make([]internalpb.SegmentStats, 0)
for segmentID, segment := range sService.container.segments {
currentMemSize := segment.getMemSize()
segment.lastMemSize = currentMemSize
segmentNumOfRows := segment.getRowCount()
stat := internalpb.SegmentStats{
// TODO: set master pb's segment id type from uint64 to int64
SegmentID: segmentID,
MemorySize: currentMemSize,
NumRows: segmentNumOfRows,
}
statisticData = append(statisticData, stat)
}
// fmt.Println("Publish segment statistic")
// fmt.Println(statisticData)
sService.publicStatistic(statisticData)
sService.publicStatistic(&statisticData)
}
func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSegStats) {
func (sService *statsService) publicStatistic(statistic *[]internalpb.SegmentStats) {
// TODO: publish statistic
}