Implment data service

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-01-23 14:41:29 +08:00 committed by yefu.chen
parent 2f7319cdbb
commit c23e2de435
6 changed files with 96 additions and 12 deletions

View File

@ -23,7 +23,7 @@ type (
collectionInfo struct {
ID UniqueID
Schema *schemapb.CollectionSchema
partitions []UniqueID
Partitions []UniqueID
}
meta struct {
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
@ -146,7 +146,7 @@ func (meta *meta) BuildSegment(collectionID UniqueID, partitionID UniqueID, chan
func (meta *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
if _, ok := meta.segID2Info[segmentInfo.SegmentID]; !ok {
if _, ok := meta.segID2Info[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
}
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
@ -273,12 +273,12 @@ func (meta *meta) AddPartition(collectionID UniqueID, partitionID UniqueID) erro
return newErrCollectionNotFound(collectionID)
}
for _, t := range coll.partitions {
for _, t := range coll.Partitions {
if t == partitionID {
return errors.Errorf("partition %d already exists.", partitionID)
}
}
coll.partitions = append(coll.partitions, partitionID)
coll.Partitions = append(coll.Partitions, partitionID)
return nil
}
@ -292,7 +292,7 @@ func (meta *meta) DropPartition(collID UniqueID, partitionID UniqueID) error {
}
idx := -1
for i, id := range collection.partitions {
for i, id := range collection.Partitions {
if partitionID == id {
idx = i
break
@ -303,7 +303,7 @@ func (meta *meta) DropPartition(collID UniqueID, partitionID UniqueID) error {
return fmt.Errorf("cannot find partition id %d", partitionID)
}
collection.partitions = append(collection.partitions[:idx], collection.partitions[idx+1:]...)
collection.Partitions = append(collection.Partitions[:idx], collection.Partitions[idx+1:]...)
return nil
}

View File

@ -0,0 +1,41 @@
package dataservice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCollection(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
testSchema := newTestSchema()
id, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
ID: id,
Schema: testSchema,
Partitions: []UniqueID{100},
})
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
ID: id,
Schema: testSchema,
})
assert.NotNil(t, err)
has := meta.HasCollection(id)
assert.True(t, has)
collection, err := meta.GetCollection(id)
assert.Nil(t, err)
assert.EqualValues(t, id, collection.ID)
assert.EqualValues(t, testSchema, collection.Schema)
assert.EqualValues(t, 1, len(collection.Partitions))
assert.EqualValues(t, 100, collection.Partitions[0])
err = meta.DropCollection(id)
assert.Nil(t, err)
has = meta.HasCollection(id)
assert.False(t, has)
_, err = meta.GetCollection(id)
assert.NotNil(t, err)
}

View File

@ -35,7 +35,7 @@ func newMockAllocator() *MockAllocator {
return &MockAllocator{}
}
func NewTestSchema() *schemapb.CollectionSchema {
func newTestSchema() *schemapb.CollectionSchema {
return &schemapb.CollectionSchema{
Name: "test",
Description: "schema for test used",

View File

@ -17,8 +17,9 @@ func TestAllocSegment(t *testing.T) {
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := NewTestSchema()
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
ID: collID,
Schema: schema,
@ -65,8 +66,9 @@ func TestSealSegment(t *testing.T) {
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := NewTestSchema()
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
ID: collID,
Schema: schema,
@ -90,7 +92,7 @@ func TestSealSegment(t *testing.T) {
assert.EqualValues(t, 0, len(ids))
sealedSegments, err := segAllocator.GetSealedSegments()
assert.Nil(t, err)
assert.EqualValues(t, 10, sealedSegments)
assert.EqualValues(t, 10, len(sealedSegments))
}
func TestExpireSegment(t *testing.T) {
@ -101,8 +103,9 @@ func TestExpireSegment(t *testing.T) {
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := NewTestSchema()
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
ID: collID,
Schema: schema,

View File

@ -204,7 +204,7 @@ func (s *Server) loadMetaFromMaster() error {
err = s.meta.AddCollection(&collectionInfo{
ID: collection.CollectionID,
Schema: collection.Schema,
partitions: partitions.PartitionIDs,
Partitions: partitions.PartitionIDs,
})
if err != nil {
log.Println(err.Error())

View File

@ -2,6 +2,9 @@ package dataservice
import (
"context"
"time"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -11,8 +14,45 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
const (
timeout = 30 * time.Second
retry = 3
)
type Client struct {
grpcClient datapb.DataServiceClient
conn *grpc.ClientConn
addr string
}
func NewClient(addr string) *Client {
return &Client{
addr: addr,
}
}
func (c *Client) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var err error
for i := 0; i < retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil {
break
}
}
if err != nil {
return err
}
c.grpcClient = datapb.NewDataServiceClient(c.conn)
return nil
}
func (c *Client) Start() error {
return nil
}
func (c *Client) Stop() error {
return c.conn.Close()
}
func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {